You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:19 UTC

[12/50] [abbrv] flink git commit: [FLINK-6341] [jm] Don't let JM fall into infinite loop

[FLINK-6341] [jm] Don't let JM fall into infinite loop

This closes #3745.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23838392
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23838392
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23838392

Branch: refs/heads/table-retraction
Commit: 238383926b762c1d47159a2b4dabe8fd59777307
Parents: c36d6b8
Author: WangTaoTheTonic <wa...@huawei.com>
Authored: Thu Apr 20 20:28:10 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Apr 28 17:59:27 2017 +0200

----------------------------------------------------------------------
 .../messages/ReconnectResourceManager.java               | 11 ++++++++++-
 .../org/apache/flink/runtime/jobmanager/JobManager.scala |  7 +++++--
 2 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/23838392/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
index 6f6f878..d02193e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
@@ -22,6 +22,8 @@ import akka.actor.ActorRef;
 import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
 import org.apache.flink.util.Preconditions;
 
+import java.util.UUID;
+
 /**
  * This message signals that the ResourceManager should reconnect to the JobManager. It is processed
  * by the JobManager if it fails to register resources with the ResourceManager. The JobManager wants
@@ -33,14 +35,21 @@ public class ReconnectResourceManager implements RequiresLeaderSessionID, java.i
 
 	private final ActorRef resourceManager;
 
-	public ReconnectResourceManager(ActorRef resourceManager) {
+	private final UUID currentConnID;
+
+	public ReconnectResourceManager(ActorRef resourceManager, UUID currentConnID) {
 		this.resourceManager = Preconditions.checkNotNull(resourceManager);
+		this.currentConnID = Preconditions.checkNotNull(currentConnID);
 	}
 	
 	public ActorRef resourceManager() {
 		return resourceManager;
 	}
 
+	public UUID connID() {
+		return currentConnID;
+	}
+
 	@Override
 	public String toString() {
 		return "ReconnectResourceManager " + resourceManager.path();

http://git-wip-us.apache.org/repos/asf/flink/blob/23838392/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2fc3ef4..da9df2b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -178,6 +178,8 @@ class JobManager(
   /** The resource manager actor responsible for allocating and managing task manager resources. */
   var currentResourceManager: Option[ActorRef] = None
 
+  var currentRMConnID: UUID = null
+
   val taskManagerMap = mutable.Map[ActorRef, InstanceID]()
 
   /**
@@ -337,6 +339,7 @@ class JobManager(
 
       // ditch current resource manager (if any)
       currentResourceManager = Option(msg.resourceManager())
+      currentRMConnID = UUID.randomUUID()
 
       val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map(
         instance => instance.getTaskManagerID).toList.asJava
@@ -361,7 +364,7 @@ class JobManager(
       }
 
       currentResourceManager match {
-        case Some(rm) if rm.equals(msg.resourceManager()) =>
+        case Some(rm) if rm.equals(msg.resourceManager()) && currentRMConnID.equals(msg.connID()) =>
           // we should ditch the current resource manager
           log.debug(s"Disconnecting resource manager $rm and forcing a reconnect.")
           currentResourceManager = None
@@ -396,7 +399,7 @@ class JobManager(
                 case _ =>
                   log.warn("Failure while asking ResourceManager for RegisterResource. Retrying", t)
               }
-              self ! decorateMessage(new ReconnectResourceManager(rm))
+              self ! decorateMessage(new ReconnectResourceManager(rm, currentRMConnID))
           }(context.dispatcher)
 
         case None =>