You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/04/27 15:05:23 UTC

[2/2] flink git commit: [FLINK-3824] ResourceManager may repeatedly connect to outdated JobManager

[FLINK-3824] ResourceManager may repeatedly connect to outdated JobManager

When the ResourceManager receives a new leading JobManager via the
LeaderRetrievalService it tries to register with this JobManager until
connected. If during registration a new leader gets elected, the
ResourceManager may still repeatedly try to register with the old
one. This doesn't affect the registration with the new JobManager but
leaves error messages in the log file and may process unnecessary
messages.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ec1300a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ec1300a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ec1300a

Branch: refs/heads/master
Commit: 7ec1300a740e9b8e1eb595bc15e752a167592e00
Parents: 8ad264d
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Apr 26 18:44:47 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Apr 27 14:50:17 2016 +0200

----------------------------------------------------------------------
 .../clusterframework/FlinkResourceManager.java  | 26 ++++++++++----------
 1 file changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ec1300a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index 3c1a698..a5c354c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -436,22 +436,22 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceID> extend
 
 			@Override
 			public void onComplete(Throwable failure, Object msg) {
-				if (msg != null) {
-					if (msg instanceof LeaderSessionMessage &&
-						((LeaderSessionMessage) msg).message() instanceof RegisterResourceManagerSuccessful)
-					{
-						self().tell(msg, ActorRef.noSender());
-					}
-					else {
-						LOG.error("Invalid response type to registration at JobManager: {}", msg);
+				// only process if we haven't been connected in the meantime
+				if (jobManager == null) {
+					if (msg != null) {
+						if (msg instanceof LeaderSessionMessage &&
+							((LeaderSessionMessage) msg).message() instanceof RegisterResourceManagerSuccessful) {
+							self().tell(msg, ActorRef.noSender());
+						} else {
+							LOG.error("Invalid response type to registration at JobManager: {}", msg);
+							self().tell(retryMessage, ActorRef.noSender());
+						}
+					} else {
+						// no success
+						LOG.error("Resource manager could not register at JobManager", failure);
 						self().tell(retryMessage, ActorRef.noSender());
 					}
 				}
-				else {
-					// no success
-					LOG.error("Resource manager could not register at JobManager", failure);
-					self().tell(retryMessage, ActorRef.noSender());
-				}
 			}
 
 		}, context().dispatcher());