You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/08/02 18:25:32 UTC
[10/13] flink git commit: [FLINK-4299] show loss of job manager in
Client
[FLINK-4299] show loss of job manager in Client
This prints a message when the leading JobManager changes after first
connecting to a JobManager. Further, it prints a message when a connection
to a JobManager has been established.
This closes #2322.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e09a4543
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e09a4543
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e09a4543
Branch: refs/heads/release-1.1
Commit: e09a45436961eb1214a96b5a5144f2ea919af77e
Parents: 2495184
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Aug 1 18:15:56 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Aug 2 20:24:36 2016 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/client/JobClientActor.java | 14 ++++++++++++++
1 file changed, 14 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e09a4543/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
index 2b3138a..9379c30 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -125,6 +125,11 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval
else if (message instanceof JobManagerLeaderAddress) {
JobManagerLeaderAddress msg = (JobManagerLeaderAddress) message;
+ if (jobManager != null) {
+ // only print this message when we had been connected to a JobManager before
+ logAndPrintMessage("New JobManager elected. Connecting to " + msg.address());
+ }
+
disconnectFromJobManager();
this.leaderSessionID = msg.leaderSessionID();
@@ -144,6 +149,8 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval
JobManagerActorRef msg = (JobManagerActorRef) message;
connectToJobManager(msg.jobManager());
+ logAndPrintMessage("Connected to JobManager at " + msg.jobManager());
+
if (jobGraph != null && !jobSuccessfullySubmitted) {
// if we haven't yet submitted the job successfully
tryToSubmitJob(jobGraph);
@@ -280,6 +287,13 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval
return leaderSessionID;
}
+ private void logAndPrintMessage(String message) {
+ LOG.info(message);
+ if (sysoutUpdates) {
+ System.out.println(message);
+ }
+ }
+
private void logAndPrintMessage(ExecutionGraphMessages.ExecutionStateChanged message) {
LOG.info(message.toString());
if (sysoutUpdates) {