You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:19 UTC
[12/50] [abbrv] flink git commit: [FLINK-6341] [jm] Don't let JM fall
into infinite loop
[FLINK-6341] [jm] Don't let JM fall into infinite loop
This closes #3745.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23838392
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23838392
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23838392
Branch: refs/heads/table-retraction
Commit: 238383926b762c1d47159a2b4dabe8fd59777307
Parents: c36d6b8
Author: WangTaoTheTonic <wa...@huawei.com>
Authored: Thu Apr 20 20:28:10 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Apr 28 17:59:27 2017 +0200
----------------------------------------------------------------------
.../messages/ReconnectResourceManager.java | 11 ++++++++++-
.../org/apache/flink/runtime/jobmanager/JobManager.scala | 7 +++++--
2 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/23838392/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
index 6f6f878..d02193e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
@@ -22,6 +22,8 @@ import akka.actor.ActorRef;
import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
import org.apache.flink.util.Preconditions;
+import java.util.UUID;
+
/**
* This message signals that the ResourceManager should reconnect to the JobManager. It is processed
* by the JobManager if it fails to register resources with the ResourceManager. The JobManager wants
@@ -33,14 +35,21 @@ public class ReconnectResourceManager implements RequiresLeaderSessionID, java.i
private final ActorRef resourceManager;
- public ReconnectResourceManager(ActorRef resourceManager) {
+ private final UUID currentConnID;
+
+ public ReconnectResourceManager(ActorRef resourceManager, UUID currentConnID) {
this.resourceManager = Preconditions.checkNotNull(resourceManager);
+ this.currentConnID = Preconditions.checkNotNull(currentConnID);
}
public ActorRef resourceManager() {
return resourceManager;
}
+ public UUID connID() {
+ return currentConnID;
+ }
+
@Override
public String toString() {
return "ReconnectResourceManager " + resourceManager.path();
http://git-wip-us.apache.org/repos/asf/flink/blob/23838392/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2fc3ef4..da9df2b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -178,6 +178,8 @@ class JobManager(
/** The resource manager actor responsible for allocating and managing task manager resources. */
var currentResourceManager: Option[ActorRef] = None
+ var currentRMConnID: UUID = null
+
val taskManagerMap = mutable.Map[ActorRef, InstanceID]()
/**
@@ -337,6 +339,7 @@ class JobManager(
// ditch current resource manager (if any)
currentResourceManager = Option(msg.resourceManager())
+ currentRMConnID = UUID.randomUUID()
val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map(
instance => instance.getTaskManagerID).toList.asJava
@@ -361,7 +364,7 @@ class JobManager(
}
currentResourceManager match {
- case Some(rm) if rm.equals(msg.resourceManager()) =>
+ case Some(rm) if rm.equals(msg.resourceManager()) && currentRMConnID.equals(msg.connID()) =>
// we should ditch the current resource manager
log.debug(s"Disconnecting resource manager $rm and forcing a reconnect.")
currentResourceManager = None
@@ -396,7 +399,7 @@ class JobManager(
case _ =>
log.warn("Failure while asking ResourceManager for RegisterResource. Retrying", t)
}
- self ! decorateMessage(new ReconnectResourceManager(rm))
+ self ! decorateMessage(new ReconnectResourceManager(rm, currentRMConnID))
}(context.dispatcher)
case None =>