You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/27 16:52:24 UTC

flink git commit: [FLINK-4889] [InstanceManager] Remove ActorRef dependency from InstanceManager

Repository: flink
Updated Branches:
  refs/heads/master 27fd2493e -> 09ff410a0


[FLINK-4889] [InstanceManager] Remove ActorRef dependency from InstanceManager

The instance manager should not know about the underlying RPC abstraction, namely Akka.
Therefore, the PR removes the dependency on ActorRef from the InstanceManager.

This closes #2698.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09ff410a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09ff410a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09ff410a

Branch: refs/heads/master
Commit: 09ff410a03330fabb6019a6afe8b2e32a0c9e8fb
Parents: 27fd249
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 24 00:25:13 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 27 18:51:52 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/instance/InstanceManager.java | 44 ++++++++----------
 .../flink/runtime/jobmanager/JobManager.scala   | 48 +++++++++++++-------
 .../flink/runtime/messages/Messages.scala       |  4 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 12 +++--
 4 files changed, 60 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09ff410a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 0c7e187..b0e7e57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -50,17 +50,14 @@ public class InstanceManager {
 	/** Set of hosts known to run a task manager that are thus able to execute tasks (by ID). */
 	private final Map<InstanceID, Instance> registeredHostsById;
 
-	/** Set of hosts known to run a task manager that are thus able to execute tasks (by connection). */
-	private final Map<ActorRef, Instance> registeredHostsByConnection;
-
 	/** Set of hosts known to run a task manager that are thus able to execute tasks (by ResourceID). */
 	private final Map<ResourceID, Instance> registeredHostsByResource;
 
 	/** Set of hosts that were present once and have died */
-	private final Set<ActorRef> deadHosts;
+	private final Set<ResourceID> deadHosts;
 
 	/** Listeners that want to be notified about availability and disappearance of instances */
-	private final List<InstanceListener> instanceListeners = new ArrayList<InstanceListener>();
+	private final List<InstanceListener> instanceListeners = new ArrayList<>();
 
 	/** The total number of task slots that the system has */
 	private int totalNumberOfAliveTaskSlots;
@@ -77,7 +74,6 @@ public class InstanceManager {
 	 */
 	public InstanceManager() {
 		this.registeredHostsById = new LinkedHashMap<>();
-		this.registeredHostsByConnection = new LinkedHashMap<>();
 		this.registeredHostsByResource = new LinkedHashMap<>();
 		this.deadHosts = new HashSet<>();
 	}
@@ -94,7 +90,6 @@ public class InstanceManager {
 			}
 
 			this.registeredHostsById.clear();
-			this.registeredHostsByConnection.clear();
 			this.registeredHostsByResource.clear();
 			this.deadHosts.clear();
 			this.totalNumberOfAliveTaskSlots = 0;
@@ -156,16 +151,16 @@ public class InstanceManager {
 				throw new IllegalStateException("InstanceManager is shut down.");
 			}
 
-			Instance prior = registeredHostsByConnection.get(taskManager);
+			Instance prior = registeredHostsByResource.get(taskManagerLocation.getResourceID());
 			if (prior != null) {
 				throw new IllegalStateException("Registration attempt from TaskManager at "
-					+ taskManager.path() +
+					+ taskManagerLocation.addressString() +
 					". This connection is already registered under ID " + prior.getId());
 			}
 
-			boolean wasDead = this.deadHosts.remove(taskManager);
+			boolean wasDead = this.deadHosts.remove(taskManagerLocation.getResourceID());
 			if (wasDead) {
-				LOG.info("Registering TaskManager at " + taskManager.path() +
+				LOG.info("Registering TaskManager at " + taskManagerLocation.addressString() +
 						" which was marked as dead earlier because of a heart-beat timeout.");
 			}
 
@@ -176,7 +171,6 @@ public class InstanceManager {
 			Instance host = new Instance(actorGateway, taskManagerLocation, instanceID, resources, numberOfSlots);
 
 			registeredHostsById.put(instanceID, host);
-			registeredHostsByConnection.put(taskManager, host);
 			registeredHostsByResource.put(taskManagerLocation.getResourceID(), host);
 
 			totalNumberOfAliveTaskSlots += numberOfSlots;
@@ -205,20 +199,19 @@ public class InstanceManager {
 	 * Unregisters the TaskManager with the given {@link ActorRef}. Unregistering means to mark
 	 * the given instance as dead and notify {@link InstanceListener} about the dead instance.
 	 *
-	 * @param instanceID TaskManager which is about to be marked dead.
+	 * @param instanceId TaskManager which is about to be marked dead.
 	 */
-	public void unregisterTaskManager(ActorRef instanceID, boolean terminated){
-		Instance instance = registeredHostsByConnection.get(instanceID);
+	public void unregisterTaskManager(InstanceID instanceId, boolean terminated){
+		Instance instance = registeredHostsById.get(instanceId);
 
 		if (instance != null){
 			ActorRef host = instance.getActorGateway().actor();
 
-			registeredHostsByConnection.remove(host);
 			registeredHostsById.remove(instance.getId());
 			registeredHostsByResource.remove(instance.getTaskManagerID());
 
 			if (terminated) {
-				deadHosts.add(instance.getActorGateway().actor());
+				deadHosts.add(instance.getTaskManagerID());
 			}
 
 			instance.markDead();
@@ -231,7 +224,7 @@ public class InstanceManager {
 					"registered task managers " + getNumberOfRegisteredTaskManagers() + ". Number" +
 					" of available slots " + getTotalNumberOfSlots() + ".");
 		} else {
-			LOG.warn("Tried to unregister instance {} but it is not registered.", instanceID);
+			LOG.warn("Tried to unregister instance {} but it is not registered.", instanceId);
 		}
 	}
 
@@ -240,7 +233,7 @@ public class InstanceManager {
 	 */
 	public void unregisterAllTaskManagers() {
 		for(Instance instance: registeredHostsById.values()) {
-			deadHosts.add(instance.getActorGateway().actor());
+			deadHosts.add(instance.getTaskManagerID());
 
 			instance.markDead();
 
@@ -250,12 +243,15 @@ public class InstanceManager {
 		}
 
 		registeredHostsById.clear();
-		registeredHostsByConnection.clear();
 		registeredHostsByResource.clear();
 	}
 
-	public boolean isRegistered(ActorRef taskManager) {
-		return registeredHostsByConnection.containsKey(taskManager);
+	public boolean isRegistered(InstanceID instanceId) {
+		return registeredHostsById.containsKey(instanceId);
+	}
+
+	public boolean isRegistered(ResourceID resourceId) {
+		return registeredHostsByResource.containsKey(resourceId);
 	}
 
 	public int getNumberOfRegisteredTaskManagers() {
@@ -290,10 +286,6 @@ public class InstanceManager {
 		return registeredHostsById.get(instanceID);
 	}
 
-	public Instance getRegisteredInstance(ActorRef ref) {
-		return registeredHostsByConnection.get(ref);
-	}
-
 	public Instance getRegisteredInstance(ResourceID ref) {
 		return registeredHostsByResource.get(ref);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/09ff410a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1539b8f..8ea053e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -51,7 +51,7 @@ import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionGraphBuilder, ExecutionJobVertex, StatusListenerMessenger}
-import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
+import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager}
 import org.apache.flink.runtime.io.network.PartitionState
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
@@ -68,7 +68,7 @@ import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, Accum
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry, MetricRegistryConfiguration}
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered}
 import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
@@ -83,6 +83,7 @@ import org.jboss.netty.channel.ChannelException
 
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.concurrent.forkjoin.ForkJoinPool
@@ -186,6 +187,8 @@ class JobManager(
   /** The resource manager actor responsible for allocating and managing task manager resources. */
   var currentResourceManager: Option[ActorRef] = None
 
+  val taskManagerMap = mutable.Map[ActorRef, InstanceID]()
+
   /**
    * Run when the job manager is started. Simply logs an informational message.
    * The method also starts the leader election service.
@@ -241,8 +244,8 @@ class JobManager(
 
     // disconnect the registered task managers
     instanceManager.getAllRegisteredInstances.asScala.foreach {
-      _.getActorGateway().tell(
-        Disconnect("JobManager is shutting down"),
+      instance => instance.getActorGateway().tell(
+        Disconnect(instance.getId, "JobManager is shutting down"),
         new AkkaActorGateway(self, leaderSessionID.orNull))
     }
 
@@ -331,12 +334,13 @@ class JobManager(
 
       // disconnect the registered task managers
       instanceManager.getAllRegisteredInstances.asScala.foreach {
-        _.getActorGateway().tell(
-          Disconnect("JobManager is no longer the leader"),
+        instance => instance.getActorGateway().tell(
+          Disconnect(instance.getId, "JobManager is no longer the leader"),
           new AkkaActorGateway(self, leaderSessionID.orNull))
       }
 
       instanceManager.unregisterAllTaskManagers()
+      taskManagerMap.clear()
 
       leaderSessionID = None
 
@@ -412,8 +416,8 @@ class JobManager(
       }
 
       // ResourceManager is told about the resource, now let's try to register TaskManager
-      if (instanceManager.isRegistered(taskManager)) {
-        val instanceID = instanceManager.getRegisteredInstance(taskManager).getId
+      if (instanceManager.isRegistered(resourceId)) {
+        val instanceID = instanceManager.getRegisteredInstance(resourceId).getId
 
         taskManager ! decorateMessage(
           AlreadyRegistered(
@@ -428,6 +432,8 @@ class JobManager(
             numberOfSlots,
             leaderSessionID.orNull)
 
+          taskManagerMap.put(taskManager, instanceID)
+
           taskManager ! decorateMessage(
             AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort))
 
@@ -453,7 +459,7 @@ class JobManager(
       Option(instanceManager.getRegisteredInstance(resourceID)) match {
         case Some(instance) =>
           // trigger removal of task manager
-          handleTaskManagerTerminated(instance.getActorGateway.actor())
+          handleTaskManagerTerminated(instance.getActorGateway.actor(), instance.getId)
         case None =>
           log.debug(s"Resource $resourceID has not been registered at job manager.")
       }
@@ -1039,7 +1045,12 @@ class JobManager(
       gateway.forward(SendStackTrace, new AkkaActorGateway(sender, leaderSessionID.orNull))
 
     case Terminated(taskManagerActorRef) =>
-      handleTaskManagerTerminated(taskManagerActorRef)
+      taskManagerMap.get(taskManagerActorRef) match {
+        case Some(instanceId) => handleTaskManagerTerminated(taskManagerActorRef, instanceId)
+        case None =>  log.debug("Received terminated message for task manager " +
+                                  s"${taskManagerActorRef} which is not " +
+                                  "connected to this job manager.")
+      }
 
     case RequestJobManagerStatus =>
       sender() ! decorateMessage(JobManagerStatusAlive)
@@ -1071,13 +1082,14 @@ class JobManager(
         case None =>
       }
 
-    case Disconnect(msg) =>
+    case Disconnect(instanceId, msg) =>
       val taskManager = sender()
 
-      if (instanceManager.isRegistered(taskManager)) {
+      if (instanceManager.isRegistered(instanceId)) {
         log.info(s"Task manager ${taskManager.path} wants to disconnect, because $msg.")
 
-        instanceManager.unregisterTaskManager(taskManager, false)
+        instanceManager.unregisterTaskManager(instanceId, false)
+        taskManagerMap.remove(taskManager)
         context.unwatch(taskManager)
       }
 
@@ -1123,13 +1135,15 @@ class JobManager(
     * Handler to be executed when a task manager terminates.
     * (Akka Deathwatch or notification from ResourceManager)
     *
-    * @param taskManager The ActorRef of the taskManager
+    * @param taskManager The ActorRef of the task manager
+    * @param instanceId identifying the dead task manager
     */
-  private def handleTaskManagerTerminated(taskManager: ActorRef): Unit = {
-    if (instanceManager.isRegistered(taskManager)) {
+  private def handleTaskManagerTerminated(taskManager: ActorRef, instanceId: InstanceID): Unit = {
+    if (instanceManager.isRegistered(instanceId)) {
       log.info(s"Task manager ${taskManager.path} terminated.")
 
-      instanceManager.unregisterTaskManager(taskManager, true)
+      instanceManager.unregisterTaskManager(instanceId, true)
+      taskManagerMap.remove(taskManager)
       context.unwatch(taskManager)
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/09ff410a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
index 75036b3..aa22914 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.messages
 
+import org.apache.flink.runtime.instance.InstanceID
+
 /**
  * Generic messages between JobManager, TaskManager, JobClient.
  */
@@ -39,7 +41,7 @@ object Messages {
    *
    * @param reason The reason for disconnecting, to be displayed in log and error messages.
    */
-  case class Disconnect(reason: String) extends RequiresLeaderSessionID
+  case class Disconnect(instanceId: InstanceID, reason: String) extends RequiresLeaderSessionID
 
   /**
    * Accessor for the case object instance, to simplify Java interoperability.

http://git-wip-us.apache.org/repos/asf/flink/blob/09ff410a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 1df6f92..8b597d0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -336,9 +336,13 @@ class TaskManager(
             s"from ${if (actor == null) null else actor.path}.")
       }
 
-    case Disconnect(msg) =>
-      handleJobManagerDisconnect(sender(), s"JobManager requested disconnect: $msg")
-      triggerTaskManagerRegistration()
+    case Disconnect(instanceIdToDisconnect, msg) =>
+      if (instanceIdToDisconnect.equals(instanceID)) {
+        handleJobManagerDisconnect(sender(), s"JobManager requested disconnect: $msg")
+        triggerTaskManagerRegistration()
+      } else {
+        log.debug(s"Received disconnect message for wrong instance id ${instanceIdToDisconnect}.")
+      }
 
     case msg: StopCluster =>
       log.info(s"Stopping TaskManager with final application status ${msg.finalStatus()} " +
@@ -1043,7 +1047,7 @@ class TaskManager(
 
     // de-register from the JobManager (faster detection of disconnect)
     currentJobManager foreach {
-      _ ! decorateMessage(Disconnect(s"TaskManager ${self.path} is disassociating"))
+      _ ! decorateMessage(Disconnect(instanceID, s"TaskManager ${self.path} is disassociating"))
     }
 
     currentJobManager = None