You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/11/11 16:36:56 UTC

[6/7] flink git commit: [FLINK-2994] [client] Report error cause when jobs switch to failing.

[FLINK-2994] [client] Report error cause when jobs switch to failing.

For jobs that do not switch to FAILED, but rather RESTARTING, this now prints the error cause
as well. Also minor improvement to exception printing in CliFrontend.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/976bacc6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/976bacc6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/976bacc6

Branch: refs/heads/master
Commit: 976bacc65908cc35382ddbbcdc80249a700bc2d3
Parents: 0de13b5
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Nov 10 19:42:58 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 10 19:42:58 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 18 +++++----------
 .../flink/runtime/client/JobClientActor.java    | 24 +++++++++++++++++---
 2 files changed, 27 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/976bacc6/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 2211012..933a22c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -125,8 +125,6 @@ public class CliFrontend {
 
 	private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);
 
-	private final File configDirectory;
-
 	private final Configuration config;
 
 	private final FiniteDuration askTimeout;
@@ -155,12 +153,12 @@ public class CliFrontend {
 	public CliFrontend(String configDir) throws Exception {
 
 		// configure the config directory
-		this.configDirectory = new File(configDir);
-		LOG.info("Using configuration directory " + this.configDirectory.getAbsolutePath());
+		File configDirectory = new File(configDir);
+		LOG.info("Using configuration directory " + configDirectory.getAbsolutePath());
 
 		// load the configuration
 		LOG.info("Trying to load configuration file");
-		GlobalConfiguration.loadConfiguration(this.configDirectory.getAbsolutePath());
+		GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
 		this.config = GlobalConfiguration.getConfiguration();
 
 		// load the YARN properties
@@ -175,13 +173,9 @@ public class CliFrontend {
 
 			Properties yarnProperties = new Properties();
 			try {
-				InputStream is = new FileInputStream(propertiesFile);
-				try {
+				try (InputStream is = new FileInputStream(propertiesFile)) {
 					yarnProperties.load(is);
 				}
-				finally {
-					is.close();
-				}
 			}
 			catch (IOException e) {
 				throw new Exception("Cannot read the YARN properties file", e);
@@ -915,9 +909,9 @@ public class CliFrontend {
 		}
 		LOG.error("Error while running the command.", t);
 
+		System.err.println("\n------------------------------------------------------------");
+		System.err.println(" The program finished with the following exception:\n");
 		t.printStackTrace();
-		System.err.println();
-		System.err.println("The exception above occurred while trying to run your command.");
 		return 1;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/976bacc6/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
index d08046b..b117730 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
@@ -113,9 +114,9 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval
 		// =========== State Change Messages ===============
 
 		if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) {
-			logAndPrintMessage(message);
+			logAndPrintMessage((ExecutionGraphMessages.ExecutionStateChanged) message);
 		} else if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
-			logAndPrintMessage(message);
+			logAndPrintMessage((ExecutionGraphMessages.JobStatusChanged) message);
 		}
 
 		// ============ JobManager ActorRef resolution ===============
@@ -276,13 +277,30 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval
 		return leaderSessionID;
 	}
 
-	private void logAndPrintMessage(Object message) {
+	private void logAndPrintMessage(ExecutionGraphMessages.ExecutionStateChanged message) {
 		LOG.info(message.toString());
 		if (sysoutUpdates) {
 			System.out.println(message.toString());
 		}
 	}
 
+	private void logAndPrintMessage(ExecutionGraphMessages.JobStatusChanged message) {
+		// by default, this only prints the status, and not any exception.
+		// in state FAILING, we report the exception in addition
+		if (message.newJobStatus() != JobStatus.FAILING || message.error() == null) {
+			LOG.info(message.toString());
+			if (sysoutUpdates) {
+				System.out.println(message.toString());
+			}
+		} else {
+			LOG.info(message.toString(), message.error());
+			if (sysoutUpdates) {
+				System.out.println(message.toString());
+				message.error().printStackTrace(System.out);
+			}
+		}
+	}
+
 	@Override
 	public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
 		getSelf().tell(