You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/06/24 16:56:06 UTC
[2/6] flink git commit: [FLINK-3667] additional cleanups in
YarnClusterClient
[FLINK-3667] additional cleanups in YarnClusterClient
- remove ActorRunner thread, print status in finalizeCluster instead
- prevent premature shutdown of actor system in shutdown method
- prevent timeout exceptions due to poisoning the ApplicationClient
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b593632
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b593632
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b593632
Branch: refs/heads/master
Commit: 3b593632dd162d951281fab8a8ed8c6bc2b07b39
Parents: 6420c1c
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Jun 23 10:47:10 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Jun 24 17:00:34 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 2 +-
.../flink/client/program/ClusterClient.java | 14 +-
.../apache/flink/yarn/YarnClusterClient.java | 135 ++++++-------------
3 files changed, 51 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3b593632/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index a01ab53..5c4791b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -153,7 +153,7 @@ public class CliFrontend {
// load the configuration
LOG.info("Trying to load configuration file");
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
- System.setProperty("FLINK_CONF_DIR", configDirectory.getAbsolutePath());
+ System.setProperty(ENV_CONFIG_DIRECTORY, configDirectory.getAbsolutePath());
this.config = GlobalConfiguration.getConfiguration();
http://git-wip-us.apache.org/repos/asf/flink/blob/3b593632/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index def9578..12a7a39 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -166,12 +166,14 @@ public abstract class ClusterClient {
* Shuts down the client. This stops the internal actor system and actors.
*/
public void shutdown() {
- try {
- finalizeCluster();
- } finally {
- if (!this.actorSystem.isTerminated()) {
- this.actorSystem.shutdown();
- this.actorSystem.awaitTermination();
+ synchronized (this) {
+ try {
+ finalizeCluster();
+ } finally {
+ if (!this.actorSystem.isTerminated()) {
+ this.actorSystem.shutdown();
+ this.actorSystem.awaitTermination();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b593632/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index b603294..9c77a8a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -21,7 +21,6 @@ import akka.actor.ActorRef;
import static akka.pattern.Patterns.ask;
-import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
@@ -72,7 +71,7 @@ public class YarnClusterClient extends ClusterClient {
private static final int POLLING_THREAD_INTERVAL_MS = 1000;
private YarnClient yarnClient;
- private Thread actorRunner;
+
private Thread clientShutdownHook = new ClientShutdownHook();
private PollingThread pollingRunner;
private final Configuration hadoopConfig;
@@ -144,36 +143,6 @@ public class YarnClusterClient extends ClusterClient {
leaderRetrievalService),
"applicationClient");
- actorRunner = new Thread(new Runnable() {
- @Override
- public void run() {
- // blocks until ApplicationClient has been stopped
- actorSystem.awaitTermination();
-
- // get final application report
- try {
- ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-
- LOG.info("Application " + appId + " finished with state " + appReport
- .getYarnApplicationState() + " and final state " + appReport
- .getFinalApplicationStatus() + " at " + appReport.getFinishTime());
-
- if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState()
- == YarnApplicationState.KILLED) {
- LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics());
- LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve "
- + "the full application log using this command:\n"
- + "\tyarn logs -applicationId " + appReport.getApplicationId() + "\n"
- + "(It sometimes takes a few seconds until the logs are aggregated)");
- }
- } catch (Exception e) {
- LOG.warn("Error while getting final application report", e);
- }
- }
- });
- actorRunner.setDaemon(true);
- actorRunner.start();
-
pollingRunner = new PollingThread(yarnClient, appId);
pollingRunner.setDaemon(true);
pollingRunner.start();
@@ -211,10 +180,19 @@ public class YarnClusterClient extends ClusterClient {
}
}
+ /**
+ * Disconnect from the Yarn cluster
+ */
public void disconnect() {
+
+ if (hasBeenShutDown.getAndSet(true)) {
+ return;
+ }
+
if(!isConnected) {
throw new IllegalStateException("Can not disconnect from an unconnected cluster.");
}
+
LOG.info("Disconnecting YarnClusterClient from ApplicationMaster");
try {
@@ -223,15 +201,6 @@ public class YarnClusterClient extends ClusterClient {
// we are already in the shutdown hook
}
- // tell the actor to shut down.
- applicationClient.tell(PoisonPill.getInstance(), applicationClient);
-
- try {
- actorRunner.join(1000); // wait for 1 second
- } catch (InterruptedException e) {
- LOG.warn("Shutdown of the actor runner was interrupted", e);
- Thread.currentThread().interrupt();
- }
try {
pollingRunner.stopRunner();
pollingRunner.join(1000);
@@ -239,6 +208,7 @@ public class YarnClusterClient extends ClusterClient {
LOG.warn("Shutdown of the polling runner was interrupted", e);
Thread.currentThread().interrupt();
}
+
isConnected = false;
}
@@ -278,23 +248,7 @@ public class YarnClusterClient extends ClusterClient {
if (isDetached()) {
return super.runDetached(jobGraph, classLoader);
} else {
- try {
- return super.run(jobGraph, classLoader);
- } finally {
- // show cluster status
- List<String> msgs = getNewMessages();
- if (msgs != null && msgs.size() > 1) {
-
- logAndSysout("The following messages were created by the YARN cluster while running the Job:");
- for (String msg : msgs) {
- logAndSysout(msg);
- }
- }
- if (getApplicationStatus() != ApplicationStatus.SUCCEEDED) {
- logAndSysout("YARN cluster is in non-successful state " + getApplicationStatus());
- logAndSysout("YARN Diagnostics: " + getDiagnostics());
- }
- }
+ return super.run(jobGraph, classLoader);
}
}
@@ -369,35 +323,18 @@ public class YarnClusterClient extends ClusterClient {
}
}
+ @Override
+ public List<String> getNewMessages() {
- private String getDiagnostics() {
- if(!isConnected) {
- throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
- }
-
- if (getApplicationStatus() == ApplicationStatus.SUCCEEDED) {
- LOG.warn("getDiagnostics() called for cluster which is not in failed state");
- }
- ApplicationReport lastReport = pollingRunner.getLastReport();
- if (lastReport == null) {
- LOG.warn("Last report is null");
- return null;
- } else {
- return lastReport.getDiagnostics();
+ if(hasBeenShutdown()) {
+ throw new RuntimeException("The YarnClusterClient has already been stopped");
}
- }
- @Override
- public List<String> getNewMessages() {
if(!isConnected) {
throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
}
- if(hasBeenShutdown()) {
- throw new RuntimeException("The YarnClusterClient has already been stopped");
- }
List<String> ret = new ArrayList<String>();
-
// get messages from ApplicationClient (locally)
while(true) {
Object result;
@@ -443,7 +380,6 @@ public class YarnClusterClient extends ClusterClient {
*/
@Override
public void finalizeCluster() {
-
if (isDetached() || !perJobCluster) {
// only disconnect if we are not running a per job cluster
disconnect();
@@ -452,14 +388,17 @@ public class YarnClusterClient extends ClusterClient {
}
}
+ /**
+ * Shuts down the Yarn application
+ */
public void shutdownCluster() {
- if (!isConnected) {
- throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster.");
+ if (hasBeenShutDown.getAndSet(true)) {
+ return;
}
- if(hasBeenShutDown.getAndSet(true)) {
- return;
+ if (!isConnected) {
+ throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster.");
}
try {
@@ -481,9 +420,6 @@ public class YarnClusterClient extends ClusterClient {
LOG.warn("Error while stopping YARN Application Client", e);
}
}
-
- actorSystem.shutdown();
- actorSystem.awaitTermination();
}
try {
@@ -513,12 +449,6 @@ public class YarnClusterClient extends ClusterClient {
}
try {
- actorRunner.join(1000); // wait for 1 second
- } catch (InterruptedException e) {
- LOG.warn("Shutdown of the actor runner was interrupted", e);
- Thread.currentThread().interrupt();
- }
- try {
pollingRunner.stopRunner();
pollingRunner.join(1000);
} catch(InterruptedException e) {
@@ -526,6 +456,25 @@ public class YarnClusterClient extends ClusterClient {
Thread.currentThread().interrupt();
}
+ try {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+
+ LOG.info("Application " + appId + " finished with state " + appReport
+ .getYarnApplicationState() + " and final state " + appReport
+ .getFinalApplicationStatus() + " at " + appReport.getFinishTime());
+
+ if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState()
+ == YarnApplicationState.KILLED) {
+ LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics());
+ LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve "
+ + "the full application log using this command:\n"
+ + "\tyarn logs -applicationId " + appReport.getApplicationId() + "\n"
+ + "(It sometimes takes a few seconds until the logs are aggregated)");
+ }
+ } catch (Exception e) {
+ LOG.warn("Couldn't get final report", e);
+ }
+
LOG.info("YARN Client is shutting down");
yarnClient.stop(); // actorRunner is using the yarnClient.
yarnClient = null; // set null to clearly see if somebody wants to access it afterwards.