You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/26 08:27:58 UTC

[6/8] flink git commit: [FLINK-6708] [yarn] Harden FlinkYarnSessionCli to handle GetClusterStatusResponse exceptions

[FLINK-6708] [yarn] Harden FlinkYarnSessionCli to handle GetClusterStatusResponse exceptions

This PR hardens the FlinkYarnSessionCli by handling exceptions which occur when
retrieving the GetClusterStatusResponse. If no such response is retrieved and instead
an exception is thrown, the Cli won't fail but retry it the next time.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a77867a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a77867a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a77867a

Branch: refs/heads/master
Commit: 2a77867a7980f888e1d3e9700697625e6a8429ad
Parents: 6429e59
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 24 18:26:57 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:27:18 2017 +0800

----------------------------------------------------------------------
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java  | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2a77867a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 1ece264..d2a4340 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -413,14 +413,18 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			while (true) {
 				// ------------------ check if there are updates by the cluster -----------
 
-				GetClusterStatusResponse status = yarnCluster.getClusterStatus();
-				LOG.debug("Received status message: {}", status);
+				try {
+					GetClusterStatusResponse status = yarnCluster.getClusterStatus();
+					LOG.debug("Received status message: {}", status);
 
-				if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) {
-					System.err.println("Number of connected TaskManagers changed to " +
+					if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) {
+						System.err.println("Number of connected TaskManagers changed to " +
 							status.numRegisteredTaskManagers() + ". " +
-						"Slots available: " + status.totalNumberOfSlots());
-					numTaskmanagers = status.numRegisteredTaskManagers();
+							"Slots available: " + status.totalNumberOfSlots());
+						numTaskmanagers = status.numRegisteredTaskManagers();
+					}
+				} catch (Exception e) {
+					LOG.warn("Could not retrieve the current cluster status. Retrying...", e);
 				}
 
 				List<String> messages = yarnCluster.getNewMessages();