You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/07/23 17:41:12 UTC

[4/6] flink git commit: [FLINK-2332] [runtime] Adds leader session IDs and registration session IDs to JobManager and TaskManager messages.

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index d78a594..f974946 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -20,13 +20,14 @@ package org.apache.flink.runtime.taskmanager
 
 import java.io.{File, IOException}
 import java.net.{InetAddress, InetSocketAddress}
+import java.util.UUID
 import java.util.concurrent.TimeUnit
 import java.lang.reflect.Method
 import java.lang.management.{OperatingSystemMXBean, ManagementFactory}
 
-import akka.actor._
-import akka.pattern.ask
-import akka.util.Timeout
+import _root_.akka.actor._
+import _root_.akka.pattern.ask
+import _root_.akka.util.Timeout
 
 import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
 import com.codahale.metrics.json.MetricsModule
@@ -36,9 +37,10 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
 
 import org.apache.flink.configuration.{Configuration, ConfigConstants, GlobalConfiguration, IllegalConfigurationException}
+
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage}
-import org.apache.flink.runtime.accumulators.{AccumulatorSnapshot, AccumulatorRegistry}
-import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessages, LogMessages, StreamingMode}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.{BlobService, BlobCache}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
@@ -46,7 +48,8 @@ import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, Ta
 import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.filecache.FileCache
-import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
+import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription,
+InstanceConnectionInfo, InstanceID}
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
 import org.apache.flink.runtime.io.network.NetworkEnvironment
@@ -114,15 +117,20 @@ import scala.language.postfixOps
  *    - Exceptions releasing intermediate result resources. Critical resource leak,
  *      requires a clean JVM.
  */
-class TaskManager(protected val config: TaskManagerConfiguration,
-                  protected val connectionInfo: InstanceConnectionInfo,
-                  protected val jobManagerAkkaURL: String,
-                  protected val memoryManager: MemoryManager,
-                  protected val ioManager: IOManager,
-                  protected val network: NetworkEnvironment,
-                  protected val numberOfSlots: Int)
-
-extends Actor with ActorLogMessages with ActorSynchronousLogging {
+class TaskManager(
+    protected val config: TaskManagerConfiguration,
+    protected val connectionInfo: InstanceConnectionInfo,
+    protected val jobManagerAkkaURL: String,
+    protected val memoryManager: MemoryManager,
+    protected val ioManager: IOManager,
+    protected val network: NetworkEnvironment,
+    protected val numberOfSlots: Int)
+  extends FlinkActor
+  with LeaderSessionMessages // Mixin order is important: second we want to filter leader messages
+  with LogMessages // Mixin order is important: first we want to support message logging
+{
+
+  override val log = Logger(getClass)
 
   /** The timeout for all actor ask futures */
   protected val askTimeout = new Timeout(config.timeout)
@@ -161,6 +169,10 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
   private var heartbeatScheduler: Option[Cancellable] = None
 
+  protected var leaderSessionID: Option[UUID] = None
+
+  private var currentRegistrationSessionID: UUID = UUID.randomUUID()
+
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle
   // --------------------------------------------------------------------------
@@ -183,8 +195,15 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
     // kick off the registration
     val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow)
 
-    self.tell(TriggerTaskManagerRegistration(jobManagerAkkaURL,
-                   TaskManager.INITIAL_REGISTRATION_TIMEOUT, deadline,1), ActorRef.noSender)
+    self ! decorateMessage(
+      TriggerTaskManagerRegistration(
+        currentRegistrationSessionID,
+        jobManagerAkkaURL,
+        TaskManager.INITIAL_REGISTRATION_TIMEOUT,
+        deadline,
+        1)
+    )
+
   }
 
   /**
@@ -236,8 +255,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
    * Central handling of actor messages. This method delegates to the more specialized
    * methods for handling certain classes of messages.
    */
-  override def receiveWithLogMessages: Receive = {
-
+  override def handleMessage: Receive = {
     // task messages are most common and critical, we handle them first
     case message: TaskMessage => handleTaskMessage(message)
 
@@ -259,7 +277,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
     // its registration at the JobManager
     case NotifyWhenRegisteredAtJobManager =>
       if (isConnected) {
-        sender ! RegisteredAtJobManager
+        sender ! decorateMessage(RegisteredAtJobManager)
       } else {
         waitForRegistration += sender
       }
@@ -339,12 +357,12 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
         // was into a terminal state, or in case the JobManager cannot be informed of the
         // state transition
 
-        case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) =>
+      case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) =>
 
           // we receive these from our tasks and forward them to the JobManager
           currentJobManager foreach {
             jobManager => {
-              val futureResponse = (jobManager ? updateMsg)(askTimeout)
+            val futureResponse = (jobManager ? decorateMessage(updateMsg))(askTimeout)
 
               val executionID = taskExecutionState.getID
 
@@ -353,13 +371,20 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
                 //            but only send messages to the TaskManager to do those changes
                 case Success(result) =>
                   if (!result) {
-                    self ! FailTask(executionID,
+                  self ! decorateMessage(
+                    FailTask(
+                      executionID,
                       new Exception("Task has been cancelled on the JobManager."))
+                  )
                   }
 
                 case Failure(t) =>
-                  self ! FailTask(executionID, new Exception(
-                    "Failed to send ExecutionStateChange notification to JobManager"))
+                self ! decorateMessage(
+                  FailTask(
+                    executionID,
+                    new Exception(
+                      "Failed to send ExecutionStateChange notification to JobManager"))
+                )
               }(context.dispatcher)
             }
           }
@@ -387,11 +412,15 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
           val task = runningTasks.get(executionID)
           if (task != null) {
             task.cancelExecution()
-            sender ! new TaskOperationResult(executionID, true)
+          sender ! decorateMessage(new TaskOperationResult(executionID, true))
           } else {
             log.debug(s"Cannot find task to cancel for execution ${executionID})")
-            sender ! new TaskOperationResult(executionID, false,
+          sender ! decorateMessage(
+            new TaskOperationResult(
+              executionID,
+              false,
             "No task with that execution ID was found.")
+          )
           }
 
         case PartitionState(taskExecutionId, taskResultId, partitionId, state) =>
@@ -413,7 +442,6 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
   private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = {
 
     actorMessage match {
-
       case message: TriggerCheckpoint =>
         val taskExecutionId = message.getTaskExecutionId
         val checkpointId = message.getCheckpointId
@@ -457,118 +485,148 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
    * @param message The registration message.
    */
   private def handleRegistrationMessage(message: RegistrationMessage): Unit = {
+    if(message.registrationSessionID.equals(currentRegistrationSessionID)) {
+      message match {
+        case TriggerTaskManagerRegistration(
+          registrationSessionID,
+          jobManagerURL,
+          timeout,
+          deadline,
+          attempt) =>
+
+          if (isConnected) {
+            // this may be the case, if we queue another attempt and
+            // in the meantime, the registration is acknowledged
+            log.debug(
+              "TaskManager was triggered to register at JobManager, but is already registered")
+          }
+          else if (deadline.exists(_.isOverdue())) {
+            // we failed to register in time. that means we should quit
+            log.error("Failed to register at the JobManager withing the defined maximum " +
+              "connect time. Shutting down ...")
 
-    message match {
-
-      case TriggerTaskManagerRegistration(jobManagerURL, timeout, deadline, attempt) =>
-        if (isConnected) {
-          // this may be the case, if we queue another attempt and
-          // in the meantime, the registration is acknowledged
-          log.debug(
-            "TaskManager was triggered to register at JobManager, but is already registered")
-        }
-        else if (deadline.exists(_.isOverdue())) {
-          // we failed to register in time. that means we should quit
-          log.error("Failed to register at the JobManager withing the defined maximum " +
-            "connect time. Shutting down ...")
+            // terminate ourselves (hasta la vista)
+            self ! decorateMessage(PoisonPill)
+          }
+          else {
+            log.info(s"Trying to register at JobManager ${jobManagerURL} " +
+              s"(attempt ${attempt}, timeout: ${timeout})")
+
+            val jobManager = context.actorSelection(jobManagerAkkaURL)
+            jobManager ! decorateMessage(
+              RegisterTaskManager(
+                registrationSessionID,
+                self,
+                connectionInfo,
+                resources,
+                numberOfSlots)
+            )
+
+            // the next timeout computes via exponential backoff with cap
+            val nextTimeout = (timeout * 2).min(TaskManager.MAX_REGISTRATION_TIMEOUT)
+
+            // schedule (with our timeout s delay) a check triggers a new registration
+            // attempt, if we are not registered by then
+            context.system.scheduler.scheduleOnce(timeout) {
+              if (!isConnected) {
+                self ! decorateMessage(
+                  TriggerTaskManagerRegistration(
+                    registrationSessionID,
+                    jobManagerURL,
+                    nextTimeout,
+                    deadline,
+                    attempt + 1)
+                )
+              }
+            }(context.dispatcher)
+          }
 
-          // terminate ourselves (hasta la vista)
-          self ! PoisonPill
-        }
-        else {
-          log.info(s"Trying to register at JobManager ${jobManagerURL} " +
-            s"(attempt ${attempt}, timeout: ${timeout})")
-
-          val jobManager = context.actorSelection(jobManagerAkkaURL)
-          jobManager ! RegisterTaskManager(self, connectionInfo, resources, numberOfSlots)
-
-          // the next timeout computes via exponential backoff with cap
-          val nextTimeout = (timeout * 2).min(TaskManager.MAX_REGISTRATION_TIMEOUT)
-
-          // schedule (with our timeout s delay) a check triggers a new registration
-          // attempt, if we are not registered by then
-          context.system.scheduler.scheduleOnce(timeout) {
-            if (!isConnected) {
-              self.tell(TriggerTaskManagerRegistration(jobManagerURL,
-                                 nextTimeout, deadline, attempt + 1), ActorRef.noSender)
+        // successful registration. associate with the JobManager
+        // we disambiguate duplicate or erroneous messages, to simplify debugging
+        case AcknowledgeRegistration(_, leaderSessionID, jobManager, id, blobPort) =>
+          if (isConnected) {
+            if (jobManager == currentJobManager.orNull) {
+              log.debug("Ignoring duplicate registration acknowledgement.")
+            } else {
+              log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " +
+                s"because the TaskManager is already registered at ${currentJobManager.orNull}")
             }
-          }(context.dispatcher)
-        }
-
-      // successful registration. associate with the JobManager
-      // we disambiguate duplicate or erroneous messages, to simplify debugging
-      case AcknowledgeRegistration(jobManager, id, blobPort) =>
-        if (isConnected) {
-          if (jobManager == currentJobManager.orNull) {
-            log.debug("Ignoring duplicate registration acknowledgement.")
-          } else {
-            log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " +
-              s"because the TaskManager is already registered at ${currentJobManager.orNull}")
           }
-        }
-        else {
-          // not yet connected, so let's associate with that JobManager
-          try {
-            associateWithJobManager(jobManager, id, blobPort)
-          } catch {
-            case t: Throwable =>
-              killTaskManagerFatal(
-                "Unable to start TaskManager components after registering at JobManager", t)
+          else {
+            // not yet connected, so let's associate with that JobManager
+            try {
+              associateWithJobManager(jobManager, id, blobPort, leaderSessionID)
+            } catch {
+              case t: Throwable =>
+                killTaskManagerFatal(
+                  "Unable to start TaskManager components after registering at JobManager", t)
+            }
           }
-        }
 
-      // we are already registered at that specific JobManager - duplicate answer, rare cases
-      case AlreadyRegistered(jobManager, id, blobPort) =>
-        if (isConnected) {
-          if (jobManager == currentJobManager.orNull) {
-            log.debug("Ignoring duplicate registration acknowledgement.")
-          } else {
-            log.warn(s"Received 'AlreadyRegistered' message from JobManager ${jobManager.path}, " +
-              s"even through TaskManager is currently registered at ${currentJobManager.orNull}")
+        // we are already registered at that specific JobManager - duplicate answer, rare cases
+        case AlreadyRegistered(_, leaderSesssionID, jobManager, id, blobPort) =>
+          if (isConnected) {
+            if (jobManager == currentJobManager.orNull) {
+              log.debug("Ignoring duplicate registration acknowledgement.")
+            } else {
+              log.warn(s"Received 'AlreadyRegistered' message from " +
+                s"JobManager ${jobManager.path}, even through TaskManager is currently " +
+                s"registered at ${currentJobManager.orNull}")
+            }
           }
-        }
-        else {
-          // not connected, yet, to let's associate
-          log.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'")
-
-          try {
-            associateWithJobManager(jobManager, id, blobPort)
-          } catch {
-            case t: Throwable =>
-              killTaskManagerFatal(
-                "Unable to start TaskManager components after registering at JobManager", t)
+          else {
+            // not connected, yet, to let's associate
+            log.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'")
+
+            try {
+              associateWithJobManager(jobManager, id, blobPort, leaderSesssionID)
+            } catch {
+              case t: Throwable =>
+                killTaskManagerFatal(
+                  "Unable to start TaskManager components after registering at JobManager", t)
+            }
           }
-        }
 
-      case RefuseRegistration(reason) =>
-        if (currentJobManager.isEmpty) {
-          log.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, " +
-                    s"because: ${reason}. Retrying later...")
+        case RefuseRegistration(registrationSessionID, reason) =>
+          if (currentJobManager.isEmpty) {
+            log.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, " +
+              s"because: ${reason}. Retrying later...")
 
-          // try the registration again after some time
+            // try the registration again after some time
 
-          val delay: FiniteDuration = TaskManager.DELAY_AFTER_REFUSED_REGISTRATION
-          val deadline: Option[Deadline] = config.maxRegistrationDuration.map {
-            timeout => timeout + delay fromNow
-          }
+            val delay: FiniteDuration = TaskManager.DELAY_AFTER_REFUSED_REGISTRATION
+            val deadline: Option[Deadline] = config.maxRegistrationDuration.map {
+              timeout => timeout + delay fromNow
+            }
 
-          context.system.scheduler.scheduleOnce(delay) {
-            self.tell(TriggerTaskManagerRegistration(jobManagerAkkaURL,
-                  TaskManager.INITIAL_REGISTRATION_TIMEOUT, deadline, 1), ActorRef.noSender)
-          }(context.dispatcher)
-        }
-        else {
-          // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration
-          if (sender() == currentJobManager.orNull) {
-            log.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" +
-                     s" even though this TaskManager is already registered there.")
+            context.system.scheduler.scheduleOnce(delay) {
+              self ! decorateMessage(
+                TriggerTaskManagerRegistration(
+                  registrationSessionID,
+                  jobManagerAkkaURL,
+                  TaskManager.INITIAL_REGISTRATION_TIMEOUT,
+                  deadline,
+                  1)
+              )
+            }(context.dispatcher)
           }
           else {
-            log.warn(s"Ignoring 'RefuseRegistration' from unrelated JobManager (${sender().path})")
+            // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration
+            if (sender() == currentJobManager.orNull) {
+              log.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" +
+                s" even though this TaskManager is already registered there.")
+            }
+            else {
+              log.warn(s"Ignoring 'RefuseRegistration' from unrelated " +
+                s"JobManager (${sender().path})")
+            }
           }
-        }
 
-      case _ => unhandled(message)
+        case _ => unhandled(message)
+      }
+    } else {
+      log.debug(s"Discarded registration message ${message}, because the registration session " +
+        "ID was not correct.")
     }
   }
 
@@ -593,20 +651,28 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
    * @param id The instanceID under which the TaskManager is registered
    *           at the JobManager.
    * @param blobPort The JobManager's port for the BLOB server.
+   * @param newLeaderSessionID Leader session ID of the JobManager
    */
-  private def associateWithJobManager(jobManager: ActorRef,
-                                      id: InstanceID,
-                                      blobPort: Int): Unit = {
+  private def associateWithJobManager(
+      jobManager: ActorRef,
+      id: InstanceID,
+      blobPort: Int,
+      newLeaderSessionID: UUID)
+    : Unit = {
 
     if (jobManager == null) {
-      throw new NullPointerException("jobManager may not be null")
+      throw new NullPointerException("jobManager must not be null.")
     }
     if (id == null) {
-      throw new NullPointerException("instance ID may not be null")
+      throw new NullPointerException("instance ID must not be null.")
     }
     if (blobPort <= 0 || blobPort > 65535) {
       throw new IllegalArgumentException("blob port is out of range: " + blobPort)
     }
+    if(newLeaderSessionID == null) {
+      throw new NullPointerException("Leader session ID must not be null.")
+    }
+
 
     // sanity check that we are not currently registered with a different JobManager
     if (isConnected) {
@@ -631,9 +697,18 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
       throw new IllegalStateException("JobManager-specific components are already initialized.")
     }
 
+    currentJobManager = Some(jobManager)
+    instanceID = id
+    leaderSessionID = Some(newLeaderSessionID)
+
     // start the network stack, now that we have the JobManager actor reference
     try {
-      network.associateWithTaskManagerAndJobManager(jobManager, self)
+      network.associateWithTaskManagerAndJobManager(
+        new AkkaActorGateway(jobManager, leaderSessionID),
+        new AkkaActorGateway(self, leaderSessionID)
+      )
+
+
     }
     catch {
       case e: Exception =>
@@ -665,16 +740,18 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
       libraryCacheManager = Some(new FallbackLibraryCacheManager)
     }
 
-    currentJobManager = Some(jobManager)
-    instanceID = id
-
     // watch job manager to detect when it dies
     context.watch(jobManager)
 
     // schedule regular heartbeat message for oneself
-    heartbeatScheduler = Some(context.system.scheduler.schedule(
-      TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat)
-       (context.dispatcher))
+    heartbeatScheduler = Some(
+      context.system.scheduler.schedule(
+        TaskManager.HEARTBEAT_INTERVAL,
+        TaskManager.HEARTBEAT_INTERVAL,
+        self,
+        decorateMessage(SendHeartbeat)
+      )(context.dispatcher)
+    )
 
     // notify all the actors that listen for a successful registration
     for (listener <- waitForRegistration) {
@@ -710,7 +787,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
     // de-register from the JobManager (faster detection of disconnect)
     currentJobManager foreach {
-      _ ! Disconnect(s"TaskManager ${self.path} is shutting down.")
+      _ ! decorateMessage(Disconnect(s"TaskManager ${self.path} is shutting down."))
     }
 
     currentJobManager = None
@@ -748,8 +825,14 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
           // begin attempts to reconnect
           val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow)
-          self ! TriggerTaskManagerRegistration(jobManagerAkkaURL,
-                               TaskManager.INITIAL_REGISTRATION_TIMEOUT, deadline, 1)
+          self ! decorateMessage(
+            TriggerTaskManagerRegistration(
+              currentRegistrationSessionID,
+              jobManagerAkkaURL,
+              TaskManager.INITIAL_REGISTRATION_TIMEOUT,
+              deadline,
+              1)
+          )
         }
         catch {
           // this is pretty bad, it leaves the TaskManager in a state where it cannot
@@ -795,8 +878,21 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
       // create the task. this does not grab any TaskManager resources or download
       // and libraries - the operation does not block
-      val task = new Task(tdd, memoryManager, ioManager, network, bcVarManager,
-                          self, jobManagerActor, config.timeout, libCache, fileCache)
+
+      val jobManagerGateway = new AkkaActorGateway(jobManagerActor, leaderSessionID)
+      val selfGateway = new AkkaActorGateway(self, leaderSessionID)
+
+      val task = new Task(
+        tdd,
+        memoryManager,
+        ioManager,
+        network,
+        bcVarManager,
+        selfGateway,
+        jobManagerGateway,
+        config.timeout,
+        libCache,
+        fileCache)
 
       log.info(s"Received task ${task.getTaskNameWithSubtasks}")
 
@@ -812,12 +908,12 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
       // all good, we kick off the task, which performs its own initialization
       task.startTaskThread()
       
-      sender ! Acknowledge
+      sender ! decorateMessage(Acknowledge)
     }
     catch {
       case t: Throwable => 
         log.error("SubmitTask failed", t)
-        sender ! Failure(t)
+        sender ! decorateMessage(Failure(t))
     }
   }
 
@@ -828,8 +924,9 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
    * @param partitionInfos The descriptor of the intermediate result partitions.
    */
   private def updateTaskInputPartitions(
-         executionId: ExecutionAttemptID,
-         partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)]) : Unit = {
+       executionId: ExecutionAttemptID,
+       partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)])
+    : Unit = {
 
     Option(runningTasks.get(executionId)) match {
       case Some(task) =>
@@ -867,15 +964,15 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
         }
 
         if (errors.isEmpty) {
-          sender ! Acknowledge
+          sender ! decorateMessage(Acknowledge)
         } else {
-          sender ! Failure(new Exception(errors.mkString("\n")))
+          sender ! decorateMessage(Failure(new Exception(errors.mkString("\n"))))
         }
 
       case None =>
         log.debug(s"Discard update for input partitions of task $executionId : " +
           s"task is no longer running.")
-        sender ! Acknowledge
+        sender ! decorateMessage(Acknowledge)
     }
   }
 
@@ -919,9 +1016,16 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
         registry.getSnapshot
       }
 
-      self ! UpdateTaskExecutionState(new TaskExecutionState(
-        task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause,
-        accumulators))
+        self ! decorateMessage(
+          UpdateTaskExecutionState(
+            new TaskExecutionState(
+              task.getJobID,
+              task.getExecutionId,
+              task.getExecutionState,
+              task.getFailureCause,
+              accumulators)
+          )
+        )
     }
     else {
       log.error(s"Cannot find task with ID $executionID to unregister.")
@@ -952,7 +1056,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
       }
 
        currentJobManager foreach {
-        jm => jm ! Heartbeat(instanceID, metricsReport, accumulatorEvents)
+        jm => jm ! decorateMessage(Heartbeat(instanceID, metricsReport, accumulatorEvents))
       }
     }
     catch {
@@ -972,14 +1076,12 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
     try {
       val traces = Thread.getAllStackTraces.asScala
-      val stackTraceStr = traces.map(
-        (trace: (Thread, Array[StackTraceElement])) => {
-          val (thread, elements) = trace
+      val stackTraceStr = traces.map {
+        case (thread: Thread, elements: Array[StackTraceElement]) =>
           "Thread: " + thread.getName + '\n' + elements.mkString("\n")
-          })
-        .mkString("\n\n")
+        }.mkString("\n\n")
 
-      recipient ! StackTrace(instanceID, stackTraceStr)
+      recipient ! decorateMessage(StackTrace(instanceID, stackTraceStr))
     }
     catch {
       case e: Exception => log.error("Failed to send stack trace to " + recipient.path, e)
@@ -999,7 +1101,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
       "\n" +
       "A fatal error occurred, forcing the TaskManager to shut down: " + message, cause)
 
-    self ! Kill
+    self ! decorateMessage(Kill)
   }
 }
 
@@ -1165,24 +1267,34 @@ object TaskManager {
    *                         Allows to use TaskManager subclasses for example for YARN.
    */
   @throws(classOf[Exception])
-  def selectNetworkInterfaceAndRunTaskManager(configuration: Configuration,
-                                              streamingMode: StreamingMode,
-                                              taskManagerClass: Class[_ <: TaskManager]) : Unit = {
+  def selectNetworkInterfaceAndRunTaskManager(
+      configuration: Configuration,
+      streamingMode: StreamingMode,
+      taskManagerClass: Class[_ <: TaskManager])
+    : Unit = {
 
     val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
 
-    val (taskManagerHostname, actorSystemPort) =
-       selectNetworkInterfaceAndPort(configuration, jobManagerHostname, jobManagerPort)
+    val (taskManagerHostname, actorSystemPort) = selectNetworkInterfaceAndPort(
+      configuration,
+      jobManagerHostname,
+      jobManagerPort)
 
-    runTaskManager(taskManagerHostname, actorSystemPort, configuration,
-                   streamingMode, taskManagerClass)
+    runTaskManager(
+      taskManagerHostname,
+      actorSystemPort,
+      configuration,
+      streamingMode,
+      taskManagerClass)
   }
 
   @throws(classOf[IOException])
   @throws(classOf[IllegalConfigurationException])
-  def selectNetworkInterfaceAndPort(configuration: Configuration,
-                                    jobManagerHostname: String,
-                                    jobManagerPort: Int) : (String, Int) = {
+  def selectNetworkInterfaceAndPort(
+      configuration: Configuration,
+      jobManagerHostname: String,
+      jobManagerPort: Int)
+    : (String, Int) = {
 
     var taskManagerHostname = configuration.getString(
       ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null)
@@ -1243,13 +1355,19 @@ object TaskManager {
    * @param configuration The configuration for the TaskManager.
    */
   @throws(classOf[Exception])
-  def runTaskManager(taskManagerHostname: String,
-                     actorSystemPort: Int, 
-                     configuration: Configuration,
-                     streamingMode: StreamingMode) : Unit = {
+  def runTaskManager(
+      taskManagerHostname: String,
+      actorSystemPort: Int,
+      configuration: Configuration,
+      streamingMode: StreamingMode)
+    : Unit = {
 
-    runTaskManager(taskManagerHostname, actorSystemPort, configuration,
-                   streamingMode, classOf[TaskManager])
+    runTaskManager(
+      taskManagerHostname,
+      actorSystemPort,
+      configuration,
+      streamingMode,
+      classOf[TaskManager])
   }
 
   /**
@@ -1269,11 +1387,13 @@ object TaskManager {
    *                         subclasses for example for YARN.
    */
   @throws(classOf[Exception])
-  def runTaskManager(taskManagerHostname: String,
-                     actorSystemPort: Int,
-                     configuration: Configuration,
-                     streamingMode: StreamingMode,
-                     taskManagerClass: Class[_ <: TaskManager]) : Unit = {
+  def runTaskManager(
+      taskManagerHostname: String,
+      actorSystemPort: Int,
+      configuration: Configuration,
+      streamingMode: StreamingMode,
+      taskManagerClass: Class[_ <: TaskManager])
+    : Unit = {
 
     LOG.info(s"Starting TaskManager in streaming mode $streamingMode")
 
@@ -1282,8 +1402,10 @@ object TaskManager {
     LOG.info(s"Starting TaskManager actor system at $taskManagerHostname:$actorSystemPort")
 
     val taskManagerSystem = try {
-      val akkaConfig = AkkaUtils.getAkkaConfig(configuration,
-                                               Some((taskManagerHostname, actorSystemPort)))
+      val akkaConfig = AkkaUtils.getAkkaConfig(
+        configuration,
+        Some((taskManagerHostname, actorSystemPort))
+      )
       if (LOG.isDebugEnabled) {
         LOG.debug("Using akka configuration\n " + akkaConfig)
       }
@@ -1307,13 +1429,14 @@ object TaskManager {
     // and the TaskManager actor
     try {
       LOG.info("Starting TaskManager actor")
-      val taskManager = startTaskManagerComponentsAndActor(configuration,
-                                                           taskManagerSystem,
-                                                           taskManagerHostname,
-                                                           Some(TASK_MANAGER_NAME),
-                                                           None, false,
-                                                           streamingMode,
-                                                           taskManagerClass)
+      val taskManager = startTaskManagerComponentsAndActor(
+        configuration,
+        taskManagerSystem,
+        taskManagerHostname,
+        Some(TASK_MANAGER_NAME),
+        None, false,
+        streamingMode,
+        taskManagerClass)
 
       // start a process reaper that watches the JobManager. If the TaskManager actor dies,
       // the process reaper will kill the JVM process (to ensure easy failure detection)
@@ -1433,14 +1556,15 @@ object TaskManager {
       configuredMemory << 20 // megabytes to bytes
     }
     else {
-      val fraction = configuration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-                                            ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
+      val fraction = configuration.getFloat(
+        ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+        ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
       checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction,
                            ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
                            "MemoryManager fraction of the free memory must be between 0.0 and 1.0")
 
       val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-                                                                                   fraction).toLong
+        fraction).toLong
 
       LOG.info(s"Using $fraction of the currently free heap space for Flink managed " +
         s"memory (${relativeMemSize >> 20} MB).")
@@ -1452,10 +1576,11 @@ object TaskManager {
 
     // now start the memory manager
     val memoryManager = try {
-      new DefaultMemoryManager(memorySize,
-                               taskManagerConfig.numberOfSlots,
-                               netConfig.networkBufferSize,
-                               preAllocateMemory)
+      new DefaultMemoryManager(
+        memorySize,
+        taskManagerConfig.numberOfSlots,
+        netConfig.networkBufferSize,
+        preAllocateMemory)
     }
     catch {
       case e: OutOfMemoryError => throw new Exception(
@@ -1472,7 +1597,8 @@ object TaskManager {
     }
 
     // create the actor properties (which define the actor constructor parameters)
-    val tmProps = Props(taskManagerClass,
+    val tmProps = Props(
+      taskManagerClass,
       taskManagerConfig,
       connectionInfo,
       jobManagerAkkaUrl,
@@ -1502,9 +1628,11 @@ object TaskManager {
    * @return The ActorRef to the TaskManager
    */
   @throws(classOf[IOException])
-  def getTaskManagerRemoteReference(taskManagerUrl: String,
-                                    system: ActorSystem,
-                                    timeout: FiniteDuration): ActorRef = {
+  def getTaskManagerRemoteReference(
+      taskManagerUrl: String,
+      system: ActorSystem,
+      timeout: FiniteDuration)
+    : ActorRef = {
     try {
       val future = AkkaUtils.getReference(taskManagerUrl, system, timeout)
       Await.result(future, timeout)
@@ -1536,10 +1664,11 @@ object TaskManager {
    *                  InstanceConnectionInfo, JobManager actor Akka URL).
    */
   @throws(classOf[IllegalArgumentException])
-  def parseTaskManagerConfiguration(configuration: Configuration,
-                                    taskManagerHostname: String,
-                                    localTaskManagerCommunication: Boolean):
-    (TaskManagerConfiguration,
+  def parseTaskManagerConfiguration(
+      configuration: Configuration,
+      taskManagerHostname: String,
+      localTaskManagerCommunication: Boolean)
+    : (TaskManagerConfiguration,
      NetworkEnvironmentConfiguration,
      InstanceConnectionInfo) = {
 
@@ -1579,10 +1708,10 @@ object TaskManager {
       ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)
     
     val pageSizeNew: Int = configuration.getInteger(
-                                        ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1)
+      ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1)
     
     val pageSizeOld: Int = configuration.getInteger(
-                                        ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
+      ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
 
     val pageSize: Int =
       if (pageSizeNew != -1) {
@@ -1617,7 +1746,7 @@ object TaskManager {
     val tmpDirs = configuration.getString(
       ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
       ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH)
-      .split(",|" + File.pathSeparator)
+    .split(",|" + File.pathSeparator)
 
     val nettyConfig = if (localTaskManagerCommunication) {
       None
@@ -1633,7 +1762,10 @@ object TaskManager {
     val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC
 
     val networkConfig = NetworkEnvironmentConfiguration(
-      numNetworkBuffers, pageSize, ioMode, nettyConfig)
+      numNetworkBuffers,
+      pageSize,
+      ioMode,
+      nettyConfig)
 
     // ----> timeouts, library caching, profiling
 
@@ -1667,8 +1799,12 @@ object TaskManager {
         e)
     }
 
-    val taskManagerConfig = TaskManagerConfiguration(tmpDirs, cleanupInterval, timeout,
-      finiteRegistratioDuration, slots,
+    val taskManagerConfig = TaskManagerConfiguration(
+      tmpDirs,
+      cleanupInterval,
+      timeout,
+      finiteRegistratioDuration,
+      slots,
       configuration)
 
     (taskManagerConfig, networkConfig, connectionInfo)
@@ -1720,10 +1856,12 @@ object TaskManager {
    * @throws IllegalConfigurationException Thrown if the condition is violated.
    */
   @throws(classOf[IllegalConfigurationException])
-  private def checkConfigParameter(condition: Boolean,
-                                   parameter: Any,
-                                   name: String,
-                                   errorMessage: String = ""): Unit = {
+  private def checkConfigParameter(
+      condition: Boolean,
+      parameter: Any,
+      name: String,
+      errorMessage: String = "")
+    : Unit = {
     if (!condition) {
       throw new IllegalConfigurationException(
         s"Invalid configuration value for '${name}' : ${parameter} - ${errorMessage}")

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
new file mode 100644
index 0000000..324b014
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.Option;
+
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FlinkUntypedActorTest {
+
+	private static ActorSystem actorSystem;
+
+	@BeforeClass
+	public static void setup() {
+		actorSystem = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
+	}
+
+	@AfterClass
+	public static void teardown() {
+		JavaTestKit.shutdownActorSystem(actorSystem);
+	}
+
+	@Test
+	public void testLeaderSessionMessageFilteringOfFlinkUntypedActor() {
+		final Option<UUID> leaderSessionID = Option.apply(UUID.randomUUID());
+		final Option<UUID> oldSessionID = Option.apply(UUID.randomUUID());
+
+		TestActorRef<PlainFlinkUntypedActor> actor = null;
+
+		try {
+			actor = TestActorRef.create(
+					actorSystem, Props.create(PlainFlinkUntypedActor.class, leaderSessionID));
+
+			final PlainFlinkUntypedActor underlyingActor = actor.underlyingActor();
+
+			actor.tell(new JobManagerMessages.LeaderSessionMessage(leaderSessionID, 1), ActorRef.noSender());
+			actor.tell(new JobManagerMessages.LeaderSessionMessage(oldSessionID, 2), ActorRef.noSender());
+			actor.tell(new JobManagerMessages.LeaderSessionMessage(leaderSessionID, 2), ActorRef.noSender());
+			actor.tell(1, ActorRef.noSender());
+
+			assertEquals(3, underlyingActor.getMessageCounter());
+
+		} finally {
+			stopActor(actor);
+		}
+	}
+
+	@Test
+	public void testThrowingExceptionWhenReceivingNonWrappedRequiresLeaderSessionIDMessage() {
+		final Option<UUID> leaderSessionID = Option.apply(UUID.randomUUID());
+
+		TestActorRef<PlainFlinkUntypedActor> actor = null;
+
+		try{
+			final Props props = Props.create(PlainFlinkUntypedActor.class, leaderSessionID);
+			actor = TestActorRef.create(actorSystem, props);
+
+			actor.receive(new JobManagerMessages.LeaderSessionMessage(leaderSessionID, 1));
+
+			try {
+				actor.receive(new PlainRequiresLeaderSessionID());
+
+				fail("Expected an exception to be thrown, because a RequiresLeaderSessionID" +
+						"message was sent without being wrapped in LeaderSessionMessage.");
+			} catch (Exception e) {
+				assertEquals("Received a message PlainRequiresLeaderSessionID " +
+						"without a leader session ID, even though it requires to have one.",
+						e.getMessage());
+			}
+
+		} finally {
+			stopActor(actor);
+		}
+	}
+
+	private static void stopActor(ActorRef actor) {
+		if(actor != null) {
+			actor.tell(Kill.getInstance(), ActorRef.noSender());
+		}
+	}
+
+
+	static class PlainFlinkUntypedActor extends FlinkUntypedActor {
+
+		private Option<UUID> leaderSessionID;
+
+		private int messageCounter;
+
+		public PlainFlinkUntypedActor(Option<UUID> leaderSessionID) {
+			this.leaderSessionID = leaderSessionID;
+			this.messageCounter = 0;
+		}
+
+		@Override
+		protected void handleMessage(Object message) throws Exception {
+			messageCounter++;
+		}
+
+		@Override
+		protected Option<UUID> getLeaderSessionID() {
+			return leaderSessionID;
+		}
+
+		public int getMessageCounter() {
+			return messageCounter;
+		}
+	}
+
+	static class PlainRequiresLeaderSessionID implements RequiresLeaderSessionID {
+		@Override
+		public String toString() {
+			return "PlainRequiresLeaderSessionID";
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 932e366..b124304 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -62,18 +60,19 @@ public class CoordinatorShutdownTest {
 			JobGraph testGraph = new JobGraph("test job", vertex);
 			testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
 			
-			ActorRef jobManager = cluster.getJobManager();
+			ActorGateway jobManager = cluster.getJobManagerGateway();
 
 			FiniteDuration timeout = new FiniteDuration(60, TimeUnit.SECONDS);
 			JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(testGraph, false);
 			
 			// submit is successful, but then the job dies because no TaskManager / slot is available
-			Future<Object> submitFuture = Patterns.ask(jobManager, submitMessage, timeout.toMillis());
+			Future<Object> submitFuture = jobManager.ask(submitMessage, timeout);
 			Await.result(submitFuture, timeout);
 
 			// get the execution graph and make sure the coordinator is properly shut down
-			Future<Object> jobRequestFuture = Patterns.ask(jobManager,
-					new JobManagerMessages.RequestJob(testGraph.getJobID()), timeout.toMillis());
+			Future<Object> jobRequestFuture = jobManager.ask(
+					new JobManagerMessages.RequestJob(testGraph.getJobID()),
+					timeout);
 			
 			ExecutionGraph graph = ((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
 			
@@ -109,18 +108,19 @@ public class CoordinatorShutdownTest {
 			JobGraph testGraph = new JobGraph("test job", vertex);
 			testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
 			
-			ActorRef jobManager = cluster.getJobManager();
+			ActorGateway jobManager = cluster.getJobManagerGateway();
 
 			FiniteDuration timeout = new FiniteDuration(60, TimeUnit.SECONDS);
 			JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(testGraph, false);
 
 			// submit is successful, but then the job dies because no TaskManager / slot is available
-			Future<Object> submitFuture = Patterns.ask(jobManager, submitMessage, timeout.toMillis());
+			Future<Object> submitFuture = jobManager.ask(submitMessage, timeout);
 			Await.result(submitFuture, timeout);
 
 			// get the execution graph and make sure the coordinator is properly shut down
-			Future<Object> jobRequestFuture = Patterns.ask(jobManager,
-					new JobManagerMessages.RequestJob(testGraph.getJobID()), timeout.toMillis());
+			Future<Object> jobRequestFuture = jobManager.ask(
+					new JobManagerMessages.RequestJob(testGraph.getJobID()),
+					timeout);
 
 			ExecutionGraph graph = ((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index cff7146..e3fc852 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -92,7 +92,7 @@ public class ExecutionGraphDeploymentTest {
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jid2);
 			ExecutionVertex vertex = ejv.getTaskVertices()[3];
 
-			ExecutionGraphTestUtils.SimpleInstanceGateway instanceGateway = new ExecutionGraphTestUtils.SimpleInstanceGateway(TestingUtils.directExecutionContext());
+			ExecutionGraphTestUtils.SimpleActorGateway instanceGateway = new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext());
 
 			final Instance instance = getInstance(instanceGateway);
 
@@ -295,7 +295,7 @@ public class ExecutionGraphDeploymentTest {
 		for (int i = 0; i < dop1 + dop2; i++) {
 			scheduler.newInstanceAvailable(
 					ExecutionGraphTestUtils.getInstance(
-							new ExecutionGraphTestUtils.SimpleInstanceGateway(
+							new ExecutionGraphTestUtils.SimpleActorGateway(
 									TestingUtils.directExecutionContext())));
 		}
 		assertEquals(dop1 + dop2, scheduler.getNumberOfAvailableSlots());

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 8a63060..64d4c44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -24,18 +24,17 @@ import static org.mockito.Mockito.spy;
 
 import java.lang.reflect.Field;
 import java.net.InetAddress;
-import java.util.LinkedList;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.BaseTestingInstanceGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.instance.InstanceGateway;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -101,11 +100,11 @@ public class ExecutionGraphTestUtils {
 	//  utility mocking methods
 	// --------------------------------------------------------------------------------------------
 
-	public static Instance getInstance(final InstanceGateway gateway) throws Exception {
+	public static Instance getInstance(final ActorGateway gateway) throws Exception {
 		return getInstance(gateway, 1);
 	}
 
-	public static Instance getInstance(final InstanceGateway gateway, final int numberOfSlots) throws Exception {
+	public static Instance getInstance(final ActorGateway gateway, final int numberOfSlots) throws Exception {
 		HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
 		InetAddress address = InetAddress.getByName("127.0.0.1");
 		InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
@@ -113,10 +112,10 @@ public class ExecutionGraphTestUtils {
 		return new Instance(gateway, connection, new InstanceID(), hardwareDescription, numberOfSlots);
 	}
 
-	public static class SimpleInstanceGateway extends BaseTestingInstanceGateway {
+	public static class SimpleActorGateway extends BaseTestingActorGateway {
 		public TaskDeploymentDescriptor lastTDD;
 
-		public SimpleInstanceGateway(ExecutionContext executionContext){
+		public SimpleActorGateway(ExecutionContext executionContext){
 			super(executionContext);
 		}
 
@@ -140,8 +139,8 @@ public class ExecutionGraphTestUtils {
 		}
 	}
 
-	public static class SimpleFailingInstanceGateway extends BaseTestingInstanceGateway {
-		public SimpleFailingInstanceGateway(ExecutionContext executionContext) {
+	public static class SimpleFailingActorGateway extends BaseTestingActorGateway {
+		public SimpleFailingActorGateway(ExecutionContext executionContext) {
 			super(executionContext);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index f47e92c..89b82f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -63,7 +63,7 @@ public class ExecutionStateProgressTest {
 			// mock resources and mock taskmanager
 			for (ExecutionVertex ee : ejv.getTaskVertices()) {
 				SimpleSlot slot = getInstance(
-						new SimpleInstanceGateway(
+						new SimpleActorGateway(
 								TestingUtils.defaultExecutionContext())
 				).allocateSimpleSlot(jid);
 				ee.deployToSlot(slot);

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 9db330b..e9b67af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -26,9 +26,9 @@ import java.io.IOException;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.BaseTestingInstanceGateway;
-import org.apache.flink.runtime.instance.DummyInstanceGateway;
-import org.apache.flink.runtime.instance.InstanceGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
+import org.apache.flink.runtime.instance.DummyActorGateway;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.api.common.JobID;
@@ -125,12 +125,12 @@ public class ExecutionVertexCancelTest {
 			setVertexState(vertex, ExecutionState.SCHEDULED);
 			assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
 
-			InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+			ActorGateway actorGateway = new CancelSequenceActorGateway(
 					executionContext,
 					new TaskOperationResult(execId, true),
 					new TaskOperationResult(execId, false));
 
-			Instance instance = getInstance(instanceGateway);
+			Instance instance = getInstance(actorGateway);
 			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 			vertex.deployToSlot(slot);
@@ -195,13 +195,13 @@ public class ExecutionVertexCancelTest {
 
 			// task manager cancel sequence mock actor
 			// first return NOT SUCCESS (task not found, cancel call overtook deploy call), then success (cancel call after deploy call)
-			InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+			ActorGateway actorGateway = new CancelSequenceActorGateway(
 					executionContext,
 					new	TaskOperationResult(execId, false),
 					new TaskOperationResult(execId, true)
 			);
 
-			Instance instance = getInstance(instanceGateway);
+			Instance instance = getInstance(actorGateway);
 			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 			vertex.deployToSlot(slot);
@@ -258,11 +258,11 @@ public class ExecutionVertexCancelTest {
 					AkkaUtils.getDefaultTimeout());
 			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
-			InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+			ActorGateway actorGateway = new CancelSequenceActorGateway(
 					TestingUtils.directExecutionContext(),
 					new TaskOperationResult(execId, true));
 
-			Instance instance = getInstance(instanceGateway);
+			Instance instance = getInstance(actorGateway);
 			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 			setVertexState(vertex, ExecutionState.RUNNING);
@@ -299,12 +299,12 @@ public class ExecutionVertexCancelTest {
 					AkkaUtils.getDefaultTimeout());
 			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
-			final InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+			final ActorGateway actorGateway = new CancelSequenceActorGateway(
 					TestingUtils.directExecutionContext(),
 					new TaskOperationResult(execId, true)
 			);
 
-			Instance instance = getInstance(instanceGateway);
+			Instance instance = getInstance(actorGateway);
 			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 			setVertexState(vertex, ExecutionState.RUNNING);
@@ -350,12 +350,12 @@ public class ExecutionVertexCancelTest {
 			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
 
-			final InstanceGateway instanceGateway = new CancelSequenceInstanceGateway(
+			final ActorGateway actorGateway = new CancelSequenceActorGateway(
 					TestingUtils.directExecutionContext(),
 					new TaskOperationResult(execId, false)
 			);
 
-			Instance instance = getInstance(instanceGateway);
+			Instance instance = getInstance(actorGateway);
 			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 			setVertexState(vertex, ExecutionState.RUNNING);
@@ -386,7 +386,7 @@ public class ExecutionVertexCancelTest {
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
 
-			final InstanceGateway gateway = new CancelSequenceInstanceGateway(TestingUtils.directExecutionContext());
+			final ActorGateway gateway = new CancelSequenceActorGateway(TestingUtils.directExecutionContext());
 
 			Instance instance = getInstance(gateway);
 			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
@@ -423,7 +423,7 @@ public class ExecutionVertexCancelTest {
 					AkkaUtils.getDefaultTimeout());
 			final ExecutionAttemptID execID = vertex.getCurrentExecutionAttempt().getAttemptId();
 
-			final InstanceGateway gateway = new CancelSequenceInstanceGateway(
+			final ActorGateway gateway = new CancelSequenceActorGateway(
 					TestingUtils.defaultExecutionContext(),
 					new TaskOperationResult(execID, true));
 
@@ -482,7 +482,7 @@ public class ExecutionVertexCancelTest {
 			// deploying after canceling from CREATED needs to raise an exception, because
 			// the scheduler (or any caller) needs to know that the slot should be released
 			try {
-				Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
+				Instance instance = getInstance(DummyActorGateway.INSTANCE);
 				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 				vertex.deployToSlot(slot);
@@ -525,7 +525,7 @@ public class ExecutionVertexCancelTest {
 						AkkaUtils.getDefaultTimeout());
 				setVertexState(vertex, ExecutionState.CANCELING);
 
-				Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
+				Instance instance = getInstance(DummyActorGateway.INSTANCE);
 				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 				vertex.deployToSlot(slot);
@@ -541,7 +541,7 @@ public class ExecutionVertexCancelTest {
 				ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 						AkkaUtils.getDefaultTimeout());
 
-				Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
+				Instance instance = getInstance(DummyActorGateway.INSTANCE);
 				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 				setVertexResource(vertex, slot);
@@ -562,11 +562,11 @@ public class ExecutionVertexCancelTest {
 		}
 	}
 
-	public static class CancelSequenceInstanceGateway extends BaseTestingInstanceGateway {
+	public static class CancelSequenceActorGateway extends BaseTestingActorGateway {
 		private final TaskOperationResult[] results;
 		private int index = -1;
 
-		public CancelSequenceInstanceGateway(ExecutionContext executionContext, TaskOperationResult ... result) {
+		public CancelSequenceActorGateway(ExecutionContext executionContext, TaskOperationResult... result) {
 			super(executionContext);
 			this.results = result;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 431c3a9..81ec6c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -43,7 +43,7 @@ public class ExecutionVertexDeploymentTest {
 
 			// mock taskmanager to simply accept the call
 			Instance instance = getInstance(
-					new SimpleInstanceGateway(TestingUtils.directExecutionContext()));
+					new SimpleActorGateway(TestingUtils.directExecutionContext()));
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
@@ -81,7 +81,7 @@ public class ExecutionVertexDeploymentTest {
 			final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
 
 			final Instance instance = getInstance(
-					new SimpleInstanceGateway(TestingUtils.directExecutionContext()));
+					new SimpleActorGateway(TestingUtils.directExecutionContext()));
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
@@ -124,7 +124,7 @@ public class ExecutionVertexDeploymentTest {
 					AkkaUtils.getDefaultTimeout());
 
 			final Instance instance = getInstance(
-					new SimpleInstanceGateway(TestingUtils.defaultExecutionContext()));
+					new SimpleActorGateway(TestingUtils.defaultExecutionContext()));
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -171,7 +171,7 @@ public class ExecutionVertexDeploymentTest {
 					AkkaUtils.getDefaultTimeout());
 
 			final Instance instance = getInstance(
-					new SimpleFailingInstanceGateway(TestingUtils.directExecutionContext()));
+					new SimpleFailingActorGateway(TestingUtils.directExecutionContext()));
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -201,7 +201,7 @@ public class ExecutionVertexDeploymentTest {
 					AkkaUtils.getDefaultTimeout());
 
 			final Instance instance = getInstance(
-					new SimpleFailingInstanceGateway(TestingUtils.directExecutionContext()));
+					new SimpleFailingActorGateway(TestingUtils.directExecutionContext()));
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -244,7 +244,7 @@ public class ExecutionVertexDeploymentTest {
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
 
-			final Instance instance = getInstance(new SimpleInstanceGateway(TestingUtils.directExecutionContext()));
+			final Instance instance = getInstance(new SimpleActorGateway(TestingUtils.directExecutionContext()));
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -286,7 +286,7 @@ public class ExecutionVertexDeploymentTest {
 			final ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId();
 
 			final Instance instance = getInstance(
-					new ExecutionVertexCancelTest.CancelSequenceInstanceGateway(
+					new ExecutionVertexCancelTest.CancelSequenceActorGateway(
 							context,
 							new TaskOperationResult(eid, false),
 							new TaskOperationResult(eid, true)));

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 8ea7017..5e9ee33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -25,7 +25,7 @@ import static org.mockito.Mockito.*;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.DummyInstanceGateway;
+import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -47,7 +47,7 @@ public class ExecutionVertexSchedulingTest {
 					AkkaUtils.getDefaultTimeout());
 
 			// a slot than cannot be deployed to
-			final Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
+			final Instance instance = getInstance(DummyActorGateway.INSTANCE);
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 			
 			slot.releaseSlot();
@@ -77,7 +77,7 @@ public class ExecutionVertexSchedulingTest {
 					AkkaUtils.getDefaultTimeout());
 
 			// a slot than cannot be deployed to
-			final Instance instance = getInstance(DummyInstanceGateway.INSTANCE);
+			final Instance instance = getInstance(DummyActorGateway.INSTANCE);
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			slot.releaseSlot();
@@ -113,7 +113,7 @@ public class ExecutionVertexSchedulingTest {
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
 
-			final Instance instance = getInstance(new ExecutionGraphTestUtils.SimpleInstanceGateway(TestingUtils.defaultExecutionContext()));
+			final Instance instance = getInstance(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.defaultExecutionContext()));
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			Scheduler scheduler = mock(Scheduler.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index b4a7e63..2530a53 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -377,7 +377,7 @@ public class LocalInputSplitsTest {
 		when(connection.getFQDNHostname()).thenReturn(hostname);
 		
 		return new Instance(
-				new ExecutionGraphTestUtils.SimpleInstanceGateway(
+				new ExecutionGraphTestUtils.SimpleActorGateway(
 						TestingUtils.defaultExecutionContext()),
 				connection,
 				new InstanceID(),

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index b779d79..f42543f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.DummyInstanceGateway;
+import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -78,7 +78,7 @@ public class TerminalStateDeadlockTest {
 			InstanceConnectionInfo ci = new InstanceConnectionInfo(address, 12345);
 				
 			HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000);
-			Instance instance = new Instance(DummyInstanceGateway.INSTANCE, ci, new InstanceID(), resources, 4);
+			Instance instance = new Instance(DummyActorGateway.INSTANCE, ci, new InstanceID(), resources, 4);
 
 			this.resource = instance.allocateSimpleSlot(new JobID());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index 3305254..8604b63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -26,7 +26,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.runtime.instance.DummyInstanceGateway;
+import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -403,7 +403,7 @@ public class VertexLocationConstraintTest {
 			
 			ExecutionVertex ev = eg.getAllVertices().get(vertex.getID()).getTaskVertices()[0];
 			
-			Instance instance = ExecutionGraphTestUtils.getInstance(DummyInstanceGateway.INSTANCE);
+			Instance instance = ExecutionGraphTestUtils.getInstance(DummyActorGateway.INSTANCE);
 			ev.setLocationConstraintHosts(Collections.singletonList(instance));
 			
 			assertNotNull(ev.getPreferredLocations());
@@ -435,7 +435,7 @@ public class VertexLocationConstraintTest {
 		when(connection.getFQDNHostname()).thenReturn(hostname);
 		
 		return new Instance(
-				new ExecutionGraphTestUtils.SimpleInstanceGateway(
+				new ExecutionGraphTestUtils.SimpleActorGateway(
 						TestingUtils.defaultExecutionContext()),
 				connection,
 				new InstanceID(),

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
new file mode 100644
index 0000000..2e62781
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import scala.Option;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract base class for testing {@link ActorGateway} instances. The implementing subclass
+ * only has to provide an implementation for handleMessage which contains the logic to treat
+ * different messages.
+ */
+abstract public class BaseTestingActorGateway implements ActorGateway {
+	/**
+	 * {@link ExecutionContext} which is used to execute the futures.
+	 */
+	private final ExecutionContext executionContext;
+
+	public BaseTestingActorGateway(ExecutionContext executionContext) {
+		this.executionContext = executionContext;
+	}
+
+	@Override
+	public Future<Object> ask(Object message, FiniteDuration timeout) {
+		try {
+			final Object result = handleMessage(message);
+
+			return Futures.future(new Callable<Object>() {
+				@Override
+				public Object call() throws Exception {
+					return result;
+				}
+			}, executionContext);
+
+		} catch (final Exception e) {
+			// if an exception occurred in the handleMessage method then return it as part of the future
+			return Futures.future(new Callable<Object>() {
+				@Override
+				public Object call() throws Exception {
+					throw e;
+				}
+			}, executionContext);
+		}
+	}
+
+	/**
+	 * Handles the supported messages by this InstanceGateway
+	 *
+	 * @param message Message to handle
+	 * @return Result
+	 * @throws Exception
+	 */
+	abstract public Object handleMessage(Object message) throws Exception;
+
+	@Override
+	public void tell(Object message) {
+		try {
+			handleMessage(message);
+		} catch (Exception e) {
+			// discard exception because it happens on the "remote" instance
+		}
+	}
+
+	@Override
+	public void tell(Object message, ActorGateway sender) {
+		try{
+			handleMessage(message);
+		} catch (Exception e) {
+			// discard exception because it happens on the "remote" instance
+		}
+	}
+
+	@Override
+	public void forward(Object message, ActorGateway sender) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
+		return ask(message, timeout);
+	}
+
+	@Override
+	public String path() {
+		return "BaseTestingInstanceGateway";
+	}
+
+	@Override
+	public ActorRef actor() {
+		return ActorRef.noSender();
+	}
+
+	@Override
+	public Option<UUID> leaderSessionID() {
+		return Option.empty();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
deleted file mode 100644
index e9f8259..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import akka.dispatch.Futures;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.Callable;
-
-/**
- * Abstract base class for testing {@link InstanceGateway} instances. The implementing subclass
- * only has to provide an implementation for handleMessage which contains the logic to treat
- * different messages.
- */
-abstract public class BaseTestingInstanceGateway implements InstanceGateway {
-	/**
-	 * {@link ExecutionContext} which is used to execute the futures.
-	 */
-	private final ExecutionContext executionContext;
-
-	public BaseTestingInstanceGateway(ExecutionContext executionContext) {
-		this.executionContext = executionContext;
-	}
-
-	@Override
-	public Future<Object> ask(Object message, FiniteDuration timeout) {
-		try {
-			final Object result = handleMessage(message);
-
-			return Futures.future(new Callable<Object>() {
-				@Override
-				public Object call() throws Exception {
-					return result;
-				}
-			}, executionContext);
-
-		} catch (final Exception e) {
-			// if an exception occurred in the handleMessage method then return it as part of the future
-			return Futures.future(new Callable<Object>() {
-				@Override
-				public Object call() throws Exception {
-					throw e;
-				}
-			}, executionContext);
-		}
-	}
-
-	/**
-	 * Handles the supported messages by this InstanceGateway
-	 *
-	 * @param message Message to handle
-	 * @return Result
-	 * @throws Exception
-	 */
-	abstract public Object handleMessage(Object message) throws Exception;
-
-	@Override
-	public void tell(Object message) {}
-
-	@Override
-	public void forward(Object message, ActorRef sender) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
-		return ask(message, timeout);
-	}
-
-	@Override
-	public String path() {
-		throw new UnsupportedOperationException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
new file mode 100644
index 0000000..10762f2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorRef;
+import scala.Option;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+
+/**
+ * Dummy {@link ActorGateway} implementation used for testing.
+ */
+public class DummyActorGateway implements ActorGateway {
+	public static final DummyActorGateway INSTANCE = new DummyActorGateway();
+
+	@Override
+	public Future<Object> ask(Object message, FiniteDuration timeout) {
+		return null;
+	}
+
+	@Override
+	public void tell(Object message) {}
+
+	@Override
+	public void tell(Object message, ActorGateway sender) {}
+
+	@Override
+	public void forward(Object message, ActorGateway sender) {}
+
+	@Override
+	public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
+		return null;
+	}
+
+	@Override
+	public String path() {
+		return "DummyInstanceGateway";
+	}
+
+	@Override
+	public ActorRef actor() {
+		return ActorRef.noSender();
+	}
+
+	@Override
+	public Option<UUID> leaderSessionID() {
+		return Option.<UUID>empty();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
deleted file mode 100644
index 5941201..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Dummy {@link InstanceGateway} implementation used for testing.
- */
-public class DummyInstanceGateway implements InstanceGateway {
-	public static final DummyInstanceGateway INSTANCE = new DummyInstanceGateway();
-
-	@Override
-	public Future<Object> ask(Object message, FiniteDuration timeout) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public void tell(Object message) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public void forward(Object message, ActorRef sender) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public String path() {
-		return "DummyInstanceGateway";
-	}
-}