You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/16 08:48:14 UTC

[2/2] flink git commit: [FLINK-4360] [tm] Implement TM -> JM registration logic

[FLINK-4360] [tm] Implement TM -> JM registration logic

Upon requesting a slot for a new job, the TaskManager registers this job at the
JobLeaderService. The job leader service is responsible to monitor job leader changes
for all registered jobs. In case of a new job leader, the service will try to establish
a connection to the new job leader. Upon establishing the connection the task manager
is informed about it. The task manager will then offer all allocated but not yet active
slots to the new job leader.

Implement JobLeaderService

The JobLeaderService is responsible for establishing a connection to the JM leader of a given
job.

Disable TaskExecutorTest#testRejectAllocationRequestsForOutOfSyncSlots

Add simple task submission test

Add job leader detection test case

Add task slot acceptance test

Fix RpcCompletenessTest

Add comments

This closes #2640.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2486d378
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2486d378
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2486d378

Branch: refs/heads/flip-6
Commit: 2486d3787b0d45a6220d3dcc96ca665986c47837
Parents: 9da76dc
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Oct 5 17:02:06 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 16 10:47:40 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/util/ReflectionUtil.java   | 110 ++++
 .../deployment/TaskDeploymentDescriptor.java    |  11 +-
 .../runtime/executiongraph/ExecutionVertex.java |   2 +
 .../HighAvailabilityServices.java               |   3 +-
 .../runtime/highavailability/NonHaServices.java |   4 +-
 .../highavailability/ZookeeperHaServices.java   |   2 +-
 .../jobmaster/JMTMRegistrationSuccess.java      |  45 ++
 .../flink/runtime/jobmaster/JobMaster.java      |  19 +
 .../runtime/jobmaster/JobMasterGateway.java     |  36 ++
 .../registration/RegisteredRpcConnection.java   |   2 +-
 .../resourcemanager/ResourceManager.java        |   2 +-
 .../slotmanager/SlotManager.java                |   8 +-
 .../runtime/taskexecutor/JobLeaderListener.java |  60 +++
 .../runtime/taskexecutor/JobLeaderService.java  | 390 ++++++++++++++
 .../taskexecutor/JobManagerConnection.java      |  23 +-
 .../runtime/taskexecutor/JobManagerTable.java   |  59 +++
 .../runtime/taskexecutor/TaskExecutor.java      | 522 ++++++++++++++-----
 .../taskexecutor/TaskExecutorGateway.java       |  25 +-
 ...TaskExecutorToResourceManagerConnection.java |   5 +
 .../runtime/taskexecutor/TaskManagerRunner.java |   2 +
 .../taskexecutor/TaskManagerServices.java       |  24 +-
 .../exceptions/SlotAllocationException.java     |  39 ++
 .../taskexecutor/slot/TaskSlotTable.java        |  39 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   2 +-
 .../TaskDeploymentDescriptorTest.java           |   6 +-
 .../TestingHighAvailabilityServices.java        |   2 +-
 .../metrics/groups/TaskManagerGroupTest.java    |  10 +-
 .../slotmanager/SlotManagerTest.java            |   2 +-
 .../slotmanager/SlotProtocolTest.java           |   8 +-
 .../flink/runtime/rpc/RpcCompletenessTest.java  |  29 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 431 ++++++++++++++-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   3 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  75 +--
 .../flink/runtime/taskmanager/TaskStopTest.java |   2 +-
 .../flink/runtime/taskmanager/TaskTest.java     |  23 +-
 .../tasks/InterruptSensitiveRestoreTest.java    |  38 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  25 +-
 37 files changed, 1805 insertions(+), 283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
index b851eba..2883570 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
@@ -23,6 +23,9 @@ import org.apache.flink.annotation.Internal;
 
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
 
 @Internal
 public final class ReflectionUtil {
@@ -151,6 +154,113 @@ public final class ReflectionUtil {
 	}
 
 	/**
+	 * Extract the full template type information from the given type's template parameter at the
+	 * given position.
+	 *
+	 * @param type type to extract the full template parameter information from
+	 * @param templatePosition describing at which position the template type parameter is
+	 * @return Full type information describing the template parameter's type
+	 */
+	public static FullTypeInfo getFullTemplateType(Type type, int templatePosition) {
+		if (type instanceof ParameterizedType) {
+			return getFullTemplateType(((ParameterizedType) type).getActualTypeArguments()[templatePosition]);
+		} else {
+			throw new IllegalArgumentException();
+		}
+	}
+
+	/**
+	 * Extract the full type information from the given type.
+	 *
+	 * @param type to be analyzed
+	 * @return Full type information describing the given type
+	 */
+	public static FullTypeInfo getFullTemplateType(Type type) {
+		if (type instanceof ParameterizedType) {
+			ParameterizedType parameterizedType = (ParameterizedType) type;
+
+			FullTypeInfo[] templateTypeInfos = new FullTypeInfo[parameterizedType.getActualTypeArguments().length];
+
+			for (int i = 0; i < parameterizedType.getActualTypeArguments().length; i++) {
+				templateTypeInfos[i] = getFullTemplateType(parameterizedType.getActualTypeArguments()[i]);
+			}
+
+			return new FullTypeInfo((Class<?>)parameterizedType.getRawType(), templateTypeInfos);
+		} else {
+			return new FullTypeInfo((Class<?>) type, null);
+		}
+	}
+
+	/**
+	 * Container for the full type information of a type. This means that it contains the
+	 * {@link Class} object and for each template parameter it contains a full type information
+	 * describing the type.
+	 */
+	public static class FullTypeInfo {
+		private final Class<?> clazz;
+		private final FullTypeInfo[] templateTypeInfos;
+
+
+		public FullTypeInfo(Class<?> clazz, FullTypeInfo[] templateTypeInfos) {
+			this.clazz = Preconditions.checkNotNull(clazz);
+			this.templateTypeInfos = templateTypeInfos;
+		}
+
+		public Class<?> getClazz() {
+			return clazz;
+		}
+
+		public FullTypeInfo[] getTemplateTypeInfos() {
+			return templateTypeInfos;
+		}
+
+		public Iterator<Class<?>> getClazzIterator() {
+			UnionIterator<Class<?>> unionIterator = new UnionIterator<>();
+
+			unionIterator.add(Collections.<Class<?>>singleton(clazz).iterator());
+
+			if (templateTypeInfos != null) {
+				for (int i = 0; i < templateTypeInfos.length; i++) {
+					unionIterator.add(templateTypeInfos[i].getClazzIterator());
+				}
+			}
+
+			return unionIterator;
+		}
+
+		@Override
+		public String toString() {
+			StringBuilder builder = new StringBuilder();
+
+			builder.append(clazz.getSimpleName());
+
+			if (templateTypeInfos != null) {
+				builder.append("<");
+
+				for (int i = 0; i < templateTypeInfos.length - 1; i++) {
+					builder.append(templateTypeInfos[i]).append(", ");
+				}
+
+				builder.append(templateTypeInfos[templateTypeInfos.length - 1]);
+				builder.append(">");
+			}
+
+			return builder.toString();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof FullTypeInfo) {
+				FullTypeInfo other = (FullTypeInfo) obj;
+
+				return clazz == other.getClazz() && Arrays.equals(templateTypeInfos, other.getTemplateTypeInfos());
+			} else {
+				return false;
+			}
+		}
+	}
+
+	/**
 	 * Private constructor to prevent instantiation.
 	 */
 	private ReflectionUtil() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index b1ac665..884d632 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -59,7 +59,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	private final ExecutionAttemptID executionId;
 
 	/** The allocation ID of the slot in which the task shall be run */
-	private final AllocationID allocationID;
+	private final AllocationID allocationId;
 
 	/** The task's name. */
 	private final String taskName;
@@ -115,6 +115,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	 */
 	public TaskDeploymentDescriptor(
 		JobID jobID,
+		AllocationID allocationId,
 		String jobName,
 		JobVertexID vertexID,
 		ExecutionAttemptID executionId,
@@ -142,6 +143,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		checkArgument(attemptNumber >= 0);
 
 		this.jobID = checkNotNull(jobID);
+		this.allocationId = checkNotNull(allocationId);
 		this.jobName = checkNotNull(jobName);
 		this.vertexID = checkNotNull(vertexID);
 		this.executionId = checkNotNull(executionId);
@@ -162,11 +164,11 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		this.operatorState = operatorState;
 		this.keyGroupState = keyGroupState;
 		this.partitionableOperatorState = partitionableOperatorStateHandles;
-		this.allocationID = new AllocationID();
 	}
 
 	public TaskDeploymentDescriptor(
 		JobID jobID,
+		AllocationID allocationId,
 		String jobName,
 		JobVertexID vertexID,
 		ExecutionAttemptID executionId,
@@ -187,6 +189,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 
 		this(
 			jobID,
+			allocationId,
 			jobName,
 			vertexID,
 			executionId,
@@ -327,8 +330,8 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		return requiredClasspaths;
 	}
 
-	public AllocationID getAllocationID() {
-		return allocationID;
+	public AllocationID getAllocationId() {
+		return allocationId;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 4837803..708a72c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -666,6 +667,7 @@ public class ExecutionVertex {
 
 		return new TaskDeploymentDescriptor(
 			getJobId(),
+			new AllocationID(), // TODO: Obtain the proper allocation id from the slot
 			getExecutionGraph().getJobName(),
 			getJobvertexId(),
 			executionId,

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 5d78ffc..484cddb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -49,10 +49,11 @@ public interface HighAvailabilityServices {
 	 * Gets the leader retriever for the job JobMaster which is responsible for the given job
 	 *
 	 * @param jobID The identifier of the job.
+	 * @param defaultAddress address under which the job manager is reachable
 	 * @return
 	 * @throws Exception
 	 */
-	LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception;
+	LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception;
 
 	/**
 	 * Gets the leader election service for the cluster's resource manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index d7fd2bf..1c73c01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -82,8 +82,8 @@ public class NonHaServices implements HighAvailabilityServices {
 	}
 
 	@Override
-	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
-		return new StandaloneLeaderRetrievalService(jobMastersAddress.get(jobID), new UUID(0, 0));
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception {
+		return new StandaloneLeaderRetrievalService(defaultAddress, new UUID(0, 0));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index 3a7736b..bbe8ecb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -111,7 +111,7 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	}
 
 	@Override
-	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception {
 		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JMTMRegistrationSuccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JMTMRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JMTMRegistrationSuccess.java
new file mode 100644
index 0000000..7272cd4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JMTMRegistrationSuccess.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.util.Preconditions;
+
+public class JMTMRegistrationSuccess extends RegistrationResponse.Success {
+	private static final long serialVersionUID = -3528383155961318929L;
+
+	private final ResourceID resourceID;
+	private final int blobPort;
+
+	public JMTMRegistrationSuccess(ResourceID resourceID, int blobPort) {
+		Preconditions.checkArgument(0 < blobPort && 65536 > blobPort, "The blob port has to be 0 < blobPort < 65536.");
+
+		this.resourceID = Preconditions.checkNotNull(resourceID);
+		this.blobPort = blobPort;
+	}
+
+	public ResourceID getResourceID() {
+		return resourceID;
+	}
+
+	public int getBlobPort() {
+		return blobPort;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index e11f3a1..a7be476 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
@@ -646,6 +647,24 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				executionGraph.getRequiredClasspaths());
 	}
 
+	@RpcMethod
+	public Iterable<AllocationID> offerSlots(final Iterable<AllocationID> slots, UUID leaderId) {
+		throw new UnsupportedOperationException("Has to be implemented.");
+	}
+
+	@RpcMethod
+	public void failSlot(final AllocationID allocationId, UUID leaderId, Exception cause) {
+		throw new UnsupportedOperationException("Has to be implemented.");
+	}
+
+	@RpcMethod
+	public RegistrationResponse registerTaskManager(
+		final String taskManagerAddress,
+		final ResourceID taskManagerProcessId,
+		final UUID leaderId) {
+		throw new UnsupportedOperationException("Has to be implemented.");
+	}
+
 	//----------------------------------------------------------------------------------------------
 	// Internal methods
 	//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index b27b41c..0f155a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -35,6 +36,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
@@ -170,4 +172,38 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * Request the classloading props of this job.
 	 */
 	Future<ClassloadingProps> requestClassloadingProps();
+
+	/**
+	 * Offer the given slots to the job manager. The response contains the set of accepted slots.
+	 *
+	 * @param slots to offer to the job manager
+	 * @param leaderId identifying the job leader
+	 * @param timeout for the rpc call
+	 * @return Future set of accepted slots.
+	 */
+	Future<Iterable<AllocationID>> offerSlots(final Iterable<AllocationID> slots, UUID leaderId, @RpcTimeout final Time timeout);
+
+	/**
+	 * Fail the slot with the given allocation id and cause.
+	 *
+	 * @param allocationId identifying the slot to fail
+	 * @param leaderId identifying the job leader
+	 * @param cause of the failing
+	 */
+	void failSlot(final AllocationID allocationId, UUID leaderId, Exception cause);
+
+	/**
+	 * Register the task manager at the job manager.
+	 *
+	 * @param taskManagerAddress address of the task manager
+	 * @param taskManagerProcessId identifying the task manager
+	 * @param leaderId identifying the job leader
+	 * @param timeout for the rpc call
+	 * @return Future registration response indicating whether the registration was successful or not
+	 */
+	Future<RegistrationResponse> registerTaskManager(
+		final String taskManagerAddress,
+		final ResourceID taskManagerProcessId,
+		final UUID leaderId,
+		@RpcTimeout final Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
index 76093b0..78d4dbc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
@@ -35,7 +35,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * This utility class implements the basis of RPC connecting from one component to another component,
  * for example the RPC connection from TaskExecutor to ResourceManager.
- * This {@code RegisteredRpcConnection} implements registration and get target gateway .
+ * This {@code RegisteredRpcConnection} implements registration and get target gateway.
  *
  * <p>The registration gives access to a future that is completed upon successful registration.
  * The RPC connection can be closed, for example when the target where it tries to register

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3122804..6f6d525 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -188,7 +188,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		} else {
 			try {
 				LeaderRetrievalService jobMasterLeaderRetriever =
-					highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
+					highAvailabilityServices.getJobManagerLeaderRetriever(jobID, jobMasterAddress);
 				jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
 			} catch (Exception e) {
 				log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index e312ea2..f055971 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -298,7 +298,13 @@ public abstract class SlotManager {
 		final TaskExecutorRegistration registration = freeSlot.getTaskExecutorRegistration();
 		final Future<TMSlotRequestReply> slotRequestReplyFuture =
 			registration.getTaskExecutorGateway()
-				.requestSlot(freeSlot.getSlotId(), allocationID, rmServices.getLeaderID(), timeout);
+				.requestSlot(
+					freeSlot.getSlotId(),
+					slotRequest.getJobId(),
+					allocationID,
+					"foobar", // TODO: set proper JM address
+					rmServices.getLeaderID(),
+					timeout);
 
 		slotRequestReplyFuture.handleAsync(new BiFunction<TMSlotRequestReply, Throwable, Void>() {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java
new file mode 100644
index 0000000..f02a8c2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+
+import java.util.UUID;
+
+/**
+ * Listener for the {@link JobLeaderService}. The listener is notified whenever a job manager
+ * gained leadership for a registered job and the service could establish a connection to it.
+ * Furthermore, the listener is notified when a job manager loses leadership for a job. In case
+ * of an error, the {@link #handleError(Throwable)}} is called.
+ */
+public interface JobLeaderListener {
+
+	/**
+	 * Callback if a job manager has gained leadership for the job identified by the job id and a
+	 * connection could be established to this job manager.
+	 *
+	 * @param jobId identifying the job for which the job manager has gained leadership
+	 * @param jobManagerGateway to the job leader
+	 * @param jobLeaderId new leader id of the job leader
+	 * @param registrationMessage containing further registration information
+	 */
+	void jobManagerGainedLeadership(JobID jobId, JobMasterGateway jobManagerGateway, UUID jobLeaderId, JMTMRegistrationSuccess registrationMessage);
+
+	/**
+	 * Callback if the job leader for the job with the given job id lost its leadership.
+	 *
+	 * @param jobId identifying the job whose leader has lost leadership
+	 * @param jobLeaderId old leader id
+	 */
+	void jobManagerLostLeadership(JobID jobId, UUID jobLeaderId);
+
+	/**
+	 * Callback for errors which might occur in the {@link JobLeaderService}.
+	 *
+	 * @param throwable cause
+	 */
+	void handleError(Throwable throwable);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
new file mode 100644
index 0000000..9e71349
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.registration.RegisteredRpcConnection;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.registration.RetryingRegistration;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+/**
+ * This service has the responsibility to monitor the job leaders (the job manager which is leader
+ * for a given job) for all registered jobs. Upon gaining leadership for a job and detection by the
+ * job leader service, the service tries to establish a connection to the job leader. After
+ * successfully establishing a connection, the job leader listener is notified about the new job
+ * leader and its connection. In case that a job leader loses leadership, the job leader listener
+ * is notified as well.
+ */
+public class JobLeaderService {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JobLeaderService.class);
+
+	/** Process id of the owning process */
+	private final ResourceID ownerProcessId;
+
+	/** The leader retrieval service and listener for each registered job */
+	private final Map<JobID, Tuple2<LeaderRetrievalService, JobLeaderService.JobManagerLeaderListener>> jobLeaderServices;
+
+	/** Internal state of the service */
+	private volatile JobLeaderService.State state;
+
+	/** Address of the owner of this service. This address is used for the job manager connection */
+	private String ownerAddress;
+
+	/** Rpc service to use for establishing connections */
+	private RpcService rpcService;
+
+	/** High availability services to create the leader retrieval services from */
+	private HighAvailabilityServices highAvailabilityServices;
+
+	/** Job leader listener listening for job leader changes */
+	private JobLeaderListener jobLeaderListener;
+
+	public JobLeaderService(ResourceID ownerProcessId) {
+		this.ownerProcessId = Preconditions.checkNotNull(ownerProcessId);
+
+		jobLeaderServices = new HashMap<>(4);
+
+		state = JobLeaderService.State.CREATED;
+
+		ownerAddress = null;
+		rpcService = null;
+		highAvailabilityServices = null;
+		jobLeaderListener = null;
+	}
+
+	// -------------------------------------------------------------------------------
+	// Methods
+	// -------------------------------------------------------------------------------
+
+	/**
+	 * Start the job leader service with the given services.
+	 *
+	 * @param initialOwnerAddress to be used for establishing connections (source address)
+	 * @param initialRpcService to be used to create rpc connections
+	 * @param initialHighAvailabilityServices to create leader retrieval services for the different jobs
+	 * @param initialJobLeaderListener listening for job leader changes
+	 */
+	public void start(
+		final String initialOwnerAddress,
+		final RpcService initialRpcService,
+		final HighAvailabilityServices initialHighAvailabilityServices,
+		final JobLeaderListener initialJobLeaderListener) {
+
+		if (JobLeaderService.State.CREATED != state) {
+			throw new IllegalStateException("The service has already been started.");
+		} else {
+			LOG.info("Start job leader service.");
+
+			this.ownerAddress = Preconditions.checkNotNull(initialOwnerAddress);
+			this.rpcService = Preconditions.checkNotNull(initialRpcService);
+			this.highAvailabilityServices = Preconditions.checkNotNull(initialHighAvailabilityServices);
+			this.jobLeaderListener = Preconditions.checkNotNull(initialJobLeaderListener);
+			state = JobLeaderService.State.STARTED;
+		}
+	}
+
+	/**
+	 * Stop the job leader services. This implies stopping all leader retrieval services for the
+	 * different jobs and their leader retrieval listeners.
+	 *
+	 * @throws Exception if an error occurs while stopping the service
+	 */
+	public void stop() throws Exception {
+		LOG.info("Stop job leader service.");
+
+		if (JobLeaderService.State.STARTED == state) {
+
+			for (Tuple2<LeaderRetrievalService, JobLeaderService.JobManagerLeaderListener> leaderRetrievalServiceEntry: jobLeaderServices.values()) {
+				LeaderRetrievalService leaderRetrievalService = leaderRetrievalServiceEntry.f0;
+				JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = leaderRetrievalServiceEntry.f1;
+
+				jobManagerLeaderListener.stop();
+				leaderRetrievalService.stop();
+			}
+
+			jobLeaderServices.clear();
+		}
+
+		state = JobLeaderService.State.STOPPED;
+	}
+
+	/**
+	 * Check whether the service monitors the given job.
+	 *
+	 * @param jobId identifying the job
+	 * @return True if the given job is monitored; otherwise false
+	 */
+	public boolean containsJob(JobID jobId) {
+		Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently not running.");
+
+		return jobLeaderServices.containsKey(jobId);
+	}
+
+	/**
+	 * Remove the given job from being monitored by the job leader service.
+	 *
+	 * @param jobId identifying the job to remove from monitoring
+	 * @throws Exception if an error occurred while stopping the leader retrieval service and listener
+	 */
+	public void removeJob(JobID jobId) throws Exception {
+		Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently not running.");
+
+		Tuple2<LeaderRetrievalService, JobLeaderService.JobManagerLeaderListener> entry = jobLeaderServices.remove(jobId);
+
+		if (entry != null) {
+			LOG.info("Remove job {} from job leader monitoring.", jobId);
+
+			LeaderRetrievalService leaderRetrievalService = entry.f0;
+			JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = entry.f1;
+
+			leaderRetrievalService.stop();
+			jobManagerLeaderListener.stop();
+		}
+	}
+
+	/**
+	 * Add the given job to be monitored. This means that the service tries to detect leaders for
+	 * this job and then tries to establish a connection to it.
+	 *
+	 * @param jobId identifying the job to monitor
+	 * @param defaultTargetAddress of the job leader
+	 * @throws Exception if an error occurs while starting the leader retrieval service
+	 */
+	public void addJob(final JobID jobId, final String defaultTargetAddress) throws Exception {
+		Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently not running.");
+
+		LOG.info("Add job {} for job leader monitoring.", jobId);
+
+		final LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
+			jobId,
+			defaultTargetAddress);
+
+		JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobId);
+
+		leaderRetrievalService.start(jobManagerLeaderListener);
+
+		jobLeaderServices.put(jobId, Tuple2.of(leaderRetrievalService, jobManagerLeaderListener));
+	}
+
+	/**
+	 * Leader listener which tries to establish a connection to a newly detected job leader.
+	 */
+	private final class JobManagerLeaderListener implements LeaderRetrievalListener {
+
+		/** Job id identifying the job to look for a leader */
+		private final JobID jobId;
+
+		/** Rpc connection to the job leader */
+		private RegisteredRpcConnection<JobMasterGateway, JMTMRegistrationSuccess> rpcConnection;
+
+		/** State of the listener */
+		private volatile boolean stopped;
+
+		/** Leader id of the current job leader */
+		private volatile UUID currentLeaderId;
+
+		private JobManagerLeaderListener(JobID jobId) {
+			this.jobId = Preconditions.checkNotNull(jobId);
+
+			stopped = false;
+			rpcConnection = null;
+			currentLeaderId = null;
+		}
+
+		public void stop() {
+			stopped = true;
+
+			if (rpcConnection != null) {
+				rpcConnection.close();
+			}
+		}
+
+		@Override
+		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderId) {
+			if (stopped) {
+				LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. " +
+					"However, the service is no longer running.", JobLeaderService.class.getSimpleName(), jobId);
+			} else {
+				LOG.debug("New leader information for job {}. Address: {}, leader id: {}.",
+					jobId, leaderAddress, leaderId);
+
+				if (leaderAddress == null || leaderAddress.isEmpty()) {
+					// the leader lost leadership but there is no other leader yet.
+					if (rpcConnection != null) {
+						rpcConnection.close();
+					}
+
+					jobLeaderListener.jobManagerLostLeadership(jobId, currentLeaderId);
+
+					currentLeaderId = leaderId;
+				} else {
+					currentLeaderId = leaderId;
+
+					if (rpcConnection != null) {
+						// check if we are already trying to connect to this leader
+						if (!leaderId.equals(rpcConnection.getTargetLeaderId())) {
+							rpcConnection.close();
+
+							rpcConnection = new JobManagerRegisteredRpcConnection(
+								LOG,
+								leaderAddress,
+								leaderId,
+								rpcService.getExecutor());
+						}
+					} else {
+						rpcConnection = new JobManagerRegisteredRpcConnection(
+							LOG,
+							leaderAddress,
+							leaderId,
+							rpcService.getExecutor());
+					}
+
+					// double check for a concurrent stop operation
+					if (stopped) {
+						rpcConnection.close();
+					} else {
+						LOG.info("Try to register at job manager {} with leader id {}.", leaderAddress, leaderId);
+						rpcConnection.start();
+					}
+				}
+			}
+		}
+
+		@Override
+		public void handleError(Exception exception) {
+			if (stopped) {
+				LOG.debug("{}'s leader retrieval listener reported an exception for job {}. " +
+						"However, the service is no longer running.", JobLeaderService.class.getSimpleName(),
+					jobId, exception);
+			} else {
+				jobLeaderListener.handleError(exception);
+			}
+		}
+
+		/**
+		 * Rpc connection for the job manager <--> task manager connection.
+		 */
+		private final class JobManagerRegisteredRpcConnection extends RegisteredRpcConnection<JobMasterGateway, JMTMRegistrationSuccess> {
+
+			JobManagerRegisteredRpcConnection(
+				Logger log,
+				String targetAddress,
+				UUID targetLeaderId,
+				Executor executor) {
+				super(log, targetAddress, targetLeaderId, executor);
+			}
+
+			@Override
+			protected RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> generateRegistration() {
+				return new JobLeaderService.JobManagerRetryingRegistration(
+					LOG,
+					rpcService,
+					"JobManager",
+					JobMasterGateway.class,
+					getTargetAddress(),
+					getTargetLeaderId(),
+					ownerAddress,
+					ownerProcessId);
+			}
+
+			@Override
+			protected void onRegistrationSuccess(JMTMRegistrationSuccess success) {
+				// filter out old registration attempts
+				if (getTargetLeaderId().equals(currentLeaderId)) {
+					log.info("Successful registration at job manager {} for job {}.", getTargetAddress(), jobId);
+
+					jobLeaderListener.jobManagerGainedLeadership(jobId, getTargetGateway(), getTargetLeaderId(), success);
+				} else {
+					log.debug("Encountered obsolete JobManager registration success from {} with leader session ID {}.", getTargetAddress(), getTargetLeaderId());
+				}
+			}
+
+			@Override
+			protected void onRegistrationFailure(Throwable failure) {
+				// filter out old registration attempts
+				if (getTargetLeaderId().equals(currentLeaderId)) {
+					log.info("Failed to register at job manager {} for job {}.", getTargetAddress(), jobId);
+					jobLeaderListener.handleError(failure);
+				} else {
+					log.debug("Obsolete JobManager registration failure from {} with leader session ID {}.", getTargetAddress(), getTargetLeaderId(), failure);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Retrying registration for the job manager <--> task manager connection.
+	 */
+	private static final class JobManagerRetryingRegistration extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> {
+
+		private final String taskManagerAddress;
+		private final ResourceID taskManagerProcessId;
+
+		JobManagerRetryingRegistration(
+			Logger log,
+			RpcService rpcService,
+			String targetName,
+			Class<JobMasterGateway> targetType,
+			String targetAddress,
+			UUID leaderId,
+			String taskManagerAddress,
+			ResourceID taskManagerProcessId) {
+
+			super(log, rpcService, targetName, targetType, targetAddress, leaderId);
+
+			this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress);
+			this.taskManagerProcessId = Preconditions.checkNotNull(taskManagerProcessId);
+		}
+
+		@Override
+		protected Future<RegistrationResponse> invokeRegistration(JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception {
+			return gateway.registerTaskManager(
+				taskManagerAddress,
+				taskManagerProcessId,
+				leaderId,
+				Time.milliseconds(timeoutMillis));
+		}
+	}
+
+	/**
+	 * Internal state of the service
+	 */
+	private enum State {
+		CREATED, STARTED, STOPPED
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
index 6fcd082..8d2057a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
@@ -34,7 +34,7 @@ import java.util.UUID;
 public class JobManagerConnection {
 
 	// Job master leader session id
-	private final UUID jobMasterLeaderId;
+	private final UUID leaderId;
 
 	// Gateway to the job master
 	private final JobMasterGateway jobMasterGateway;
@@ -55,15 +55,14 @@ public class JobManagerConnection {
 	private final PartitionStateChecker partitionStateChecker;
 
 	public JobManagerConnection(
-			UUID jobMasterLeaderId,
-			JobMasterGateway jobMasterGateway,
-			TaskManagerActions taskManagerActions,
-			CheckpointResponder checkpointResponder,
-			LibraryCacheManager libraryCacheManager,
-			ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
-			PartitionStateChecker partitionStateChecker)
-	{
-		this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId);
+		JobMasterGateway jobMasterGateway,
+		UUID leaderId,
+		TaskManagerActions taskManagerActions,
+		CheckpointResponder checkpointResponder,
+		LibraryCacheManager libraryCacheManager,
+		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
+		PartitionStateChecker partitionStateChecker) {
+		this.leaderId = Preconditions.checkNotNull(leaderId);
 		this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
 		this.taskManagerActions = Preconditions.checkNotNull(taskManagerActions);
 		this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder);
@@ -72,8 +71,8 @@ public class JobManagerConnection {
 		this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker);
 	}
 
-	public UUID getJobMasterLeaderId() {
-		return jobMasterLeaderId;
+	public UUID getLeaderId() {
+		return leaderId;
 	}
 
 	public JobMasterGateway getJobManagerGateway() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerTable.java
new file mode 100644
index 0000000..00c467e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerTable.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Container for multiple {@link JobManagerConnection} registered under their respective job id.
+ */
+public class JobManagerTable {
+	private final Map<JobID, JobManagerConnection> jobManagerConnections;
+
+	public JobManagerTable() {
+		jobManagerConnections = new HashMap<>(4);
+	}
+
+	public boolean contains(JobID jobId) {
+		return jobManagerConnections.containsKey(jobId);
+	}
+
+	public boolean put(JobID jobId, JobManagerConnection jobManagerConnection) {
+		JobManagerConnection previousJMC = jobManagerConnections.put(jobId, jobManagerConnection);
+
+		if (previousJMC != null) {
+			jobManagerConnections.put(jobId, previousJMC);
+
+			return false;
+		} else {
+			return true;
+		}
+	}
+
+	public JobManagerConnection remove(JobID jobId) {
+		return jobManagerConnections.remove(jobId);
+	}
+
+	public JobManagerConnection get(JobID jobId) {
+		return jobManagerConnections.get(jobId);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index e642315..3e3a544 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -19,12 +19,14 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -34,7 +36,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
@@ -42,6 +43,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNo
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.memory.MemoryManager;
@@ -51,7 +53,6 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -59,6 +60,7 @@ import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
 import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
 import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
@@ -81,9 +83,10 @@ import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -97,6 +100,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	/** The connection information of this task manager */
 	private final TaskManagerLocation taskManagerLocation;
 
+	/** Max blob port which is accepted */
+	public static final int MAX_BLOB_PORT = 65536;
+
 	/** The access to the leader election and retrieval services */
 	private final HighAvailabilityServices haServices;
 
@@ -121,10 +127,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	private final TaskManagerMetricGroup taskManagerMetricGroup;
 
 	private final BroadcastVariableManager broadcastVariableManager;
-	
-	/** Slots which have become available but haven't been confirmed by the RM */
-	private final Set<SlotID> unconfirmedFreeSlots;
-
 
 	private final FileCache fileCache;
 
@@ -140,6 +142,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 	private final TaskSlotTable taskSlotTable;
 
+	private final JobManagerTable jobManagerTable;
+
+	private final JobLeaderService jobLeaderService;
+
 	// ------------------------------------------------------------------------
 
 	public TaskExecutor(
@@ -155,6 +161,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		BroadcastVariableManager broadcastVariableManager,
 		FileCache fileCache,
 		TaskSlotTable taskSlotTable,
+		JobManagerTable jobManagerTable,
+		JobLeaderService jobLeaderService,
 		FatalErrorHandler fatalErrorHandler) {
 
 		super(rpcService);
@@ -173,10 +181,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
 		this.broadcastVariableManager = checkNotNull(broadcastVariableManager);
 		this.fileCache = checkNotNull(fileCache);
+		this.jobManagerTable = checkNotNull(jobManagerTable);
+		this.jobLeaderService = checkNotNull(jobLeaderService);
 
 		this.jobManagerConnections = new HashMap<>(4);
-
-		this.unconfirmedFreeSlots = new HashSet<>();
 	}
 
 	// ------------------------------------------------------------------------
@@ -195,7 +203,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 
 		// tell the task slot table who's responsible for the task slot actions
-		taskSlotTable.start(new SlotActionsImpl(), taskManagerConfiguration.getTimeout());
+		taskSlotTable.start(new SlotActionsImpl());
+
+		// start the job leader service
+		jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
 	}
 
 	/**
@@ -207,7 +218,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 		taskSlotTable.stop();
 
-		if (resourceManagerConnection.isConnected()) {
+		if (isConnectedToResourceManager()) {
 			try {
 				resourceManagerConnection.close();
 			} catch (Exception e) {
@@ -248,30 +259,39 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		log.info("Stopped TaskManager {}.", getAddress());
 	}
 
-	// ========================================================================
+	// ======================================================================
 	//  RPC methods
-	// ========================================================================
+	// ======================================================================
 
 	// ----------------------------------------------------------------------
 	// Task lifecycle RPCs
 	// ----------------------------------------------------------------------
 
 	@RpcMethod
-	public Acknowledge submitTask(TaskDeploymentDescriptor tdd, ResourceID jobManagerID) throws TaskSubmissionException {
+	public Acknowledge submitTask(TaskDeploymentDescriptor tdd, UUID jobManagerLeaderId) throws TaskSubmissionException {
 
-		JobManagerConnection jobManagerConnection = getJobManagerConnection(jobManagerID);
+		JobManagerConnection jobManagerConnection = jobManagerTable.get(tdd.getJobID());
 
 		if (jobManagerConnection == null) {
-			final String message = "Could not submit task because JobManager " + jobManagerID +
-				" was not associated.";
+			final String message = "Could not submit task because there is no JobManager " +
+				"associated for the job " + tdd.getJobID() + '.';
+
+			log.debug(message);
+			throw new TaskSubmissionException(message);
+		}
+
+		if (!jobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) {
+			final String message = "Rejecting the task submission because the job manager leader id " +
+				jobManagerLeaderId + " does not match the expected job manager leader id " +
+				jobManagerConnection.getLeaderId() + '.';
 
 			log.debug(message);
 			throw new TaskSubmissionException(message);
 		}
 
-		if (!taskSlotTable.existActiveSlot(tdd.getJobID(), tdd.getAllocationID())) {
+		if (!taskSlotTable.existsActiveSlot(tdd.getJobID(), tdd.getAllocationId())) {
 			final String message = "No task slot allocated for job ID " + tdd.getJobID() +
-				" and allocation ID " + tdd.getAllocationID() + '.';
+				" and allocation ID " + tdd.getAllocationId() + '.';
 			log.debug(message);
 			throw new TaskSubmissionException(message);
 		}
@@ -279,7 +299,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(tdd);
 
 		InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(
-				jobManagerConnection.getJobMasterLeaderId(),
+				jobManagerConnection.getLeaderId(),
 				jobManagerConnection.getJobManagerGateway(),
 				tdd.getJobID(),
 				tdd.getVertexID(),
@@ -375,7 +395,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	// ----------------------------------------------------------------------
 
 	@RpcMethod
-	public Acknowledge updatePartitions(final ExecutionAttemptID executionAttemptID, Collection<PartitionInfo> partitionInfos) throws PartitionException {
+	public Acknowledge updatePartitions(final ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos) throws PartitionException {
 		final Task task = taskSlotTable.getTask(executionAttemptID);
 
 		if (task != null) {
@@ -471,38 +491,319 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
+	// ----------------------------------------------------------------------
+	// Slot allocation RPCs
+	// ----------------------------------------------------------------------
+
 	/**
+	 * /**
 	 * Requests a slot from the TaskManager
 	 *
-	 * @param slotID Slot id for the request
-	 * @param allocationID id for the request
-	 * @param resourceManagerLeaderID current leader id of the ResourceManager
+	 * @param slotId identifying the requested slot
+	 * @param jobId identifying the job for which the request is issued
+	 * @param allocationId id for the request
+	 * @param targetAddress of the job manager requesting the slot
+	 * @param rmLeaderId current leader id of the ResourceManager
+	 * @throws SlotAllocationException if the slot allocation fails
 	 * @return answer to the slot request
 	 */
 	@RpcMethod
-	public TMSlotRequestReply requestSlot(SlotID slotID, AllocationID allocationID, UUID resourceManagerLeaderID) {
-		if (!resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderID)) {
-			return new TMSlotRequestRejected(
-				resourceManagerConnection.getRegistrationId(), getResourceID(), allocationID);
+	public TMSlotRequestReply requestSlot(
+		final SlotID slotId,
+		final JobID jobId,
+		final AllocationID allocationId,
+		final String targetAddress,
+		final UUID rmLeaderId) throws SlotAllocationException {
+		log.info("Receive slot request {} for job {} from resource manager with leader id {}.",
+			allocationId, jobId, rmLeaderId);
+
+		if (resourceManagerConnection == null) {
+			final String message = "TaskManager is not connected to a resource manager.";
+			log.debug(message);
+			throw new SlotAllocationException(message);
 		}
-		if (unconfirmedFreeSlots.contains(slotID)) {
-			// check if request has not been blacklisted because the notification of a free slot
-			// has not been confirmed by the ResourceManager
-			return new TMSlotRequestRejected(
-				resourceManagerConnection.getRegistrationId(), getResourceID(), allocationID);
+
+		if (!resourceManagerConnection.getTargetLeaderId().equals(rmLeaderId)) {
+			final String message = "The leader id " + rmLeaderId +
+				" does not match with the leader id of the connected resource manager " +
+				resourceManagerConnection.getTargetLeaderId() + '.';
+
+			log.debug(message);
+			throw new SlotAllocationException(message);
+		}
+
+		if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
+			if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, taskManagerConfiguration.getTimeout())) {
+				log.info("Allocated slot for {}.", allocationId);
+			} else {
+				log.info("Could not allocate slot for {}.", allocationId);
+				throw new SlotAllocationException("Could not allocate slot.");
+			}
+		} else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
+			final String message = "The slot " + slotId + " has already been allocated for a different job.";
+
+			log.info(message);
+			throw new SlotAllocationException(message);
+		}
+
+		if (jobManagerTable.contains(jobId)) {
+			offerSlotsToJobManager(jobId);
+		} else {
+			try {
+				jobLeaderService.addJob(jobId, targetAddress);
+			} catch (Exception e) {
+				// free the allocated slot
+				try {
+					taskSlotTable.freeSlot(allocationId);
+				} catch (SlotNotFoundException slotNotFoundException) {
+					// slot no longer existent, this should actually never happen, because we've
+					// just allocated the slot. So let's fail hard in this case!
+					onFatalError(slotNotFoundException);
+				}
+
+				// sanity check
+				if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
+					onFatalError(new Exception("Could not free slot " + slotId));
+				}
+
+				throw new SlotAllocationException("Could not add job to job leader service.", e);
+			}
 		}
-		return new TMSlotRequestRegistered(new InstanceID(), ResourceID.generate(), allocationID);
 
+		return new TMSlotRequestRegistered(resourceManagerConnection.getRegistrationId(), getResourceID(), allocationId);
 	}
 
-	// ------------------------------------------------------------------------
+	// ======================================================================
 	//  Internal methods
+	// ======================================================================
+
+	// ------------------------------------------------------------------------
+	//  Internal resource manager connection methods
+	// ------------------------------------------------------------------------
+
+	private void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLeaderId) {
+		if (resourceManagerConnection != null) {
+			if (newLeaderAddress != null) {
+				// the resource manager switched to a new leader
+				log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
+					resourceManagerConnection.getTargetAddress(), newLeaderAddress);
+			}
+			else {
+				// address null means that the current leader is lost without a new leader being there, yet
+				log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
+					resourceManagerConnection.getTargetAddress());
+			}
+
+			// drop the current connection or connection attempt
+			if (resourceManagerConnection != null) {
+				resourceManagerConnection.close();
+				resourceManagerConnection = null;
+			}
+		}
+
+		// establish a connection to the new leader
+		if (newLeaderAddress != null) {
+			log.info("Attempting to register at ResourceManager {}", newLeaderAddress);
+			resourceManagerConnection =
+				new TaskExecutorToResourceManagerConnection(
+					log,
+					this,
+					newLeaderAddress,
+					newLeaderId,
+					getMainThreadExecutor());
+			resourceManagerConnection.start();
+		}
+	}
+
 	// ------------------------------------------------------------------------
+	//  Internal job manager connection methods
+	// ------------------------------------------------------------------------
+
+	private void offerSlotsToJobManager(final JobID jobId) {
+		final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId);
+
+		if (jobManagerConnection == null) {
+			log.debug("There is no job manager connection to the leader of job {}.", jobId);
+		} else {
+			if (taskSlotTable.hasAllocatedSlots(jobId)) {
+				log.info("Offer reserved slots to the leader of job {}.", jobId);
+
+				final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
+
+				final Iterator<AllocationID> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
+				final UUID leaderId = jobManagerConnection.getLeaderId();
+
+				final Collection<AllocationID> reservedSlots = new HashSet<>(2);
+
+				while (reservedSlotsIterator.hasNext()) {
+					reservedSlots.add(reservedSlotsIterator.next());
+				}
+
+				Future<Iterable<AllocationID>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
+					reservedSlots,
+					leaderId,
+					taskManagerConfiguration.getTimeout());
+
+				acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction<Iterable<AllocationID>>() {
+					@Override
+					public void accept(Iterable<AllocationID> acceptedSlots) {
+						// check if the response is still valid
+						if (isJobManagerConnectionValid(jobId, leaderId)) {
+							// mark accepted slots active
+							for (AllocationID acceptedSlot: acceptedSlots) {
+								try {
+									if (!taskSlotTable.markSlotActive(acceptedSlot)) {
+										// the slot is either free or releasing at the moment
+										final String message = "Could not mark slot " + jobId + " active.";
+										log.debug(message);
+										jobMasterGateway.failSlot(acceptedSlot, leaderId, new Exception(message));
+									}
+
+									// remove the assigned slots so that we can free the left overs
+									reservedSlots.remove(acceptedSlot);
+								} catch (SlotNotFoundException e) {
+									log.debug("Could not mark slot {} active.", acceptedSlot,  e);
+									jobMasterGateway.failSlot(acceptedSlot, leaderId, e);
+								}
+							}
+
+							final Exception e = new Exception("The slot was rejected by the JobManager.");
+
+							for (AllocationID rejectedSlot: reservedSlots) {
+								freeSlot(rejectedSlot, e);
+							}
+						} else {
+							// discard the response since there is a new leader for the job
+							log.debug("Discard offer slot response since there is a new leader " +
+								"for the job {}.", jobId);
+						}
+					}
+				}, getMainThreadExecutor());
+
+				acceptedSlotsFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+					@Override
+					public Void apply(Throwable throwable) {
+						if (throwable instanceof TimeoutException) {
+							// We ran into a timeout. Try again.
+							offerSlotsToJobManager(jobId);
+						} else {
+							// We encountered an exception. Free the slots and return them to the RM.
+							for (AllocationID reservedSlot: reservedSlots) {
+								freeSlot(reservedSlot, throwable);
+							}
+						}
+
+						return null;
+					}
+				}, getMainThreadExecutor());
+			} else {
+				log.debug("There are no unassigned slots for the job {}.", jobId);
+			}
+		}
+	}
+
+	private void establishJobManagerConnection(JobID jobId, JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, JMTMRegistrationSuccess registrationSuccess) {
+		log.info("Establish JobManager connection for job {}.", jobId);
+
+		if (jobManagerTable.contains(jobId)) {
+			JobManagerConnection oldJobManagerConnection = jobManagerTable.get(jobId);
+
+			if (!oldJobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) {
+				closeJobManagerConnection(jobId);
+				jobManagerTable.put(jobId, associateWithJobManager(jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort()));
+			}
+		} else {
+			jobManagerTable.put(jobId, associateWithJobManager(jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort()));
+		}
 
-	private JobManagerConnection getJobManagerConnection(ResourceID jobManagerID) {
-		return jobManagerConnections.get(jobManagerID);
+		offerSlotsToJobManager(jobId);
 	}
 
+	private void closeJobManagerConnection(JobID jobId) {
+		log.info("Close JobManager connection for job {}.", jobId);
+
+		// 1. fail tasks running under this JobID
+		Iterator<Task> tasks = taskSlotTable.getTasks(jobId);
+
+		while (tasks.hasNext()) {
+			tasks.next().failExternally(new Exception("JobManager responsible for " + jobId +
+				" lost the leadership."));
+		}
+
+		// 2. Move the active slots to state allocated (possible to time out again)
+		Iterator<AllocationID> activeSlots = taskSlotTable.getActiveSlots(jobId);
+
+		while (activeSlots.hasNext()) {
+			AllocationID activeSlot = activeSlots.next();
+
+			try {
+				if (!taskSlotTable.markSlotInactive(activeSlot, taskManagerConfiguration.getTimeout())) {
+					freeSlot(activeSlot, new Exception("Slot could not be marked inactive."));
+				}
+			} catch (SlotNotFoundException e) {
+				log.debug("Could not mark the slot {} inactive.", jobId, e);
+			}
+		}
+
+		// 3. Disassociate from the JobManager
+		JobManagerConnection jobManagerConnection = jobManagerTable.remove(jobId);
+
+		if (jobManagerConnection != null) {
+			try {
+				disassociateFromJobManager(jobManagerConnection);
+			} catch (IOException e) {
+				log.warn("Could not properly disassociate from JobManager {}.",
+					jobManagerConnection.getJobManagerGateway().getAddress(), e);
+			}
+		}
+	}
+
+	private JobManagerConnection associateWithJobManager(JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, int blobPort) {
+		Preconditions.checkNotNull(jobManagerLeaderId);
+		Preconditions.checkNotNull(jobMasterGateway);
+		Preconditions.checkArgument(blobPort > 0 || blobPort < MAX_BLOB_PORT, "Blob port is out of range.");
+
+		TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobManagerLeaderId, jobMasterGateway);
+
+		CheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway);
+
+		InetSocketAddress address = new InetSocketAddress(jobMasterGateway.getAddress(), blobPort);
+
+		BlobCache blobCache = new BlobCache(address, taskManagerConfiguration.getConfiguration());
+
+		LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(
+			blobCache,
+			taskManagerConfiguration.getCleanupInterval());
+
+		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
+			jobManagerLeaderId,
+			jobMasterGateway,
+			getRpcService().getExecutor(),
+			taskManagerConfiguration.getTimeout());
+
+		PartitionStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobManagerLeaderId, jobMasterGateway);
+
+		return new JobManagerConnection(
+			jobMasterGateway,
+			jobManagerLeaderId,
+			taskManagerActions,
+			checkpointResponder,
+			libraryCacheManager,
+			resultPartitionConsumableNotifier,
+			partitionStateChecker);
+	}
+
+	private void disassociateFromJobManager(JobManagerConnection jobManagerConnection) throws IOException {
+		Preconditions.checkNotNull(jobManagerConnection);
+		JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
+		jobManagerGateway.disconnectTaskManager(getResourceID());
+		jobManagerConnection.getLibraryCacheManager().shutdown();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Internal task methods
+	// ------------------------------------------------------------------------
+
 	private void failTask(final ExecutionAttemptID executionAttemptID, final Throwable cause) {
 		final Task task = taskSlotTable.getTask(executionAttemptID);
 
@@ -571,94 +872,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
-	private void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLeaderId) {
-		if (resourceManagerConnection != null) {
-			if (newLeaderAddress != null) {
-				// the resource manager switched to a new leader
-				log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
-					resourceManagerConnection.getTargetAddress(), newLeaderAddress);
-			}
-			else {
-				// address null means that the current leader is lost without a new leader being there, yet
-				log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
-					resourceManagerConnection.getTargetAddress());
-			}
-
-			// drop the current connection or connection attempt
-			if (resourceManagerConnection != null) {
-				resourceManagerConnection.close();
-				resourceManagerConnection = null;
-			}
-		}
-
-		unconfirmedFreeSlots.clear();
-
-		// establish a connection to the new leader
-		if (newLeaderAddress != null) {
-			log.info("Attempting to register at ResourceManager {}", newLeaderAddress);
-			resourceManagerConnection =
-				new TaskExecutorToResourceManagerConnection(
-					log,
-					this,
-					newLeaderAddress,
-					newLeaderId,
-					getMainThreadExecutor());
-			resourceManagerConnection.start();
-		}
-	}
-
-	private JobManagerConnection associateWithJobManager(UUID jobMasterLeaderId,
-			JobMasterGateway jobMasterGateway, int blobPort)
-	{
-		Preconditions.checkNotNull(jobMasterLeaderId);
-		Preconditions.checkNotNull(jobMasterGateway);
-		Preconditions.checkArgument(blobPort > 0 || blobPort <= 65535, "Blob port is out of range.");
-
-		TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterLeaderId, jobMasterGateway);
-
-		CheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway);
-
-		InetSocketAddress address = new InetSocketAddress(jobMasterGateway.getAddress(), blobPort);
-
-		BlobCache blobCache = new BlobCache(address, taskManagerConfiguration.getConfiguration());
-
-		LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(
-			blobCache,
-			taskManagerConfiguration.getCleanupInterval());
-
-		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
-				jobMasterLeaderId,
-				jobMasterGateway,
-				getRpcService().getExecutor(),
-				taskManagerConfiguration.getTimeout());
-
-		PartitionStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobMasterLeaderId, jobMasterGateway);
-
-		return new JobManagerConnection(
-				jobMasterLeaderId,
-				jobMasterGateway,
-				taskManagerActions,
-				checkpointResponder,
-				libraryCacheManager,
-				resultPartitionConsumableNotifier,
-				partitionStateChecker);
-	}
-
-	private void disassociateFromJobManager(JobManagerConnection jobManagerConnection) throws IOException {
-		if (jobManagerConnection != null) {
-			JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
-
-			jobManagerGateway.disconnectTaskManager(getResourceID());
-
-			jobManagerConnection.getLibraryCacheManager().shutdown();
-		}
-	}
-
-	private void freeSlot(AllocationID allocationId) {
+	private void freeSlot(AllocationID allocationId, Throwable cause) {
 		Preconditions.checkNotNull(allocationId);
 
 		try {
-			int freedSlotIndex = taskSlotTable.freeSlot(allocationId);
+			int freedSlotIndex = taskSlotTable.freeSlot(allocationId, cause);
 
 			if (freedSlotIndex != -1 && isConnectedToResourceManager()) {
 				// the slot was freed. Tell the RM about it
@@ -674,21 +892,35 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
+	private void freeSlot(AllocationID allocationId) {
+		freeSlot(allocationId, new Exception("The slot " + allocationId + " is beeing freed."));
+	}
+
 	private void timeoutSlot(AllocationID allocationId, UUID ticket) {
 		Preconditions.checkNotNull(allocationId);
 		Preconditions.checkNotNull(ticket);
 
 		if (taskSlotTable.isValidTimeout(allocationId, ticket)) {
-			freeSlot(allocationId);
+			freeSlot(allocationId, new Exception("The slot " + allocationId + " has timed out."));
 		} else {
 			log.debug("Received an invalid timeout for allocation id {} with ticket {}.", allocationId, ticket);
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  Internal utility methods
+	// ------------------------------------------------------------------------
+
 	private boolean isConnectedToResourceManager() {
 		return (resourceManagerConnection != null && resourceManagerConnection.isConnected());
 	}
 
+	private boolean isJobManagerConnectionValid(JobID jobId, UUID leaderId) {
+		JobManagerConnection jmConnection = jobManagerTable.get(jobId);
+
+		return jmConnection != null && jmConnection.getLeaderId().equals(leaderId);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Properties
 	// ------------------------------------------------------------------------
@@ -737,11 +969,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		return resourceManagerConnection;
 	}
 
-	@VisibleForTesting
-	public void addUnconfirmedFreeSlotNotification(SlotID slotID) {
-		unconfirmedFreeSlots.add(slotID);
-	}
-
 	// ------------------------------------------------------------------------
 	//  Utility classes
 	// ------------------------------------------------------------------------
@@ -767,6 +994,44 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
+	private final class JobLeaderListenerImpl implements JobLeaderListener {
+
+		@Override
+		public void jobManagerGainedLeadership(
+			final JobID jobId,
+			final JobMasterGateway jobManagerGateway,
+			final UUID jobLeaderId,
+			final JMTMRegistrationSuccess registrationMessage) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					establishJobManagerConnection(
+						jobId,
+						jobManagerGateway,
+						jobLeaderId,
+						registrationMessage);
+				}
+			});
+		}
+
+		@Override
+		public void jobManagerLostLeadership(final JobID jobId, final UUID jobLeaderId) {
+			log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobLeaderId);
+
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					closeJobManagerConnection(jobId);
+				}
+			});
+		}
+
+		@Override
+		public void handleError(Throwable throwable) {
+			onFatalErrorAsync(throwable);
+		}
+	}
+
 	private final class TaskManagerActionsImpl implements TaskManagerActions {
 		private final UUID jobMasterLeaderId;
 		private final JobMasterGateway jobMasterGateway;
@@ -830,5 +1095,4 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			});
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index f062b96..1ffc407 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -30,9 +30,9 @@ import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskmanager.Task;
 
-import java.util.Collection;
 import java.util.UUID;
 
 /**
@@ -43,28 +43,31 @@ public interface TaskExecutorGateway extends RpcGateway {
 	/**
 	 * Requests a slot from the TaskManager
 	 *
-	 * @param slotID slot id for the request
-	 * @param allocationID id for the request
-	 * @param resourceManagerLeaderID current leader id of the ResourceManager
+	 * @param slotId slot id for the request
+	 * @param allocationId id for the request
+	 * @param resourceManagerLeaderId current leader id of the ResourceManager
+	 * @throws SlotAllocationException if the slot allocation fails
 	 * @return answer to the slot request
 	 */
 	Future<TMSlotRequestReply> requestSlot(
-		SlotID slotID,
-		AllocationID allocationID,
-		UUID resourceManagerLeaderID,
+		SlotID slotId,
+		JobID jobId,
+		AllocationID allocationId,
+		String targetAddress,
+		UUID resourceManagerLeaderId,
 		@RpcTimeout Time timeout);
 
 	/**
 	 * Submit a {@link Task} to the {@link TaskExecutor}.
 	 *
 	 * @param tdd describing the task to submit
-	 * @param jobManagerID identifying the submitting JobManager
+	 * @param leaderId of the job leader
 	 * @param timeout of the submit operation
 	 * @return Future acknowledge of the successful operation
 	 */
 	Future<Acknowledge> submitTask(
 		TaskDeploymentDescriptor tdd,
-		ResourceID jobManagerID,
+		UUID leaderId,
 		@RpcTimeout Time timeout);
 
 	/**
@@ -74,7 +77,7 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param partitionInfos telling where the partition can be retrieved from
 	 * @return Future acknowledge if the partitions have been successfully updated
 	 */
-	Future<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Collection<PartitionInfo> partitionInfos);
+	Future<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos);
 
 	/**
 	 * Fail all intermediate result partitions of the given task.

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 2dbd9eb..53f030e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -68,11 +68,16 @@ public class TaskExecutorToResourceManagerConnection
 
 	@Override
 	protected void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
+		log.info("Successful registration at resource manager {} under registration id {}.",
+			getTargetAddress(), success.getRegistrationId());
+
 		registrationId = success.getRegistrationId();
 	}
 
 	@Override
 	protected void onRegistrationFailure(Throwable failure) {
+		log.info("Failed to register at resource manager {}.", getTargetAddress(), failure);
+
 		taskExecutor.onFatalErrorAsync(failure);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index ca1d2ce..9f78682 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -99,6 +99,8 @@ public class TaskManagerRunner implements FatalErrorHandler {
 			taskManagerServices.getBroadcastVariableManager(),
 			taskManagerServices.getFileCache(),
 			taskManagerServices.getTaskSlotTable(),
+			taskManagerServices.getJobManagerTable(),
+			taskManagerServices.getJobLeaderService(),
 			this);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index c1728b4..7575ba3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -73,6 +73,8 @@ public class TaskManagerServices {
 	private final BroadcastVariableManager broadcastVariableManager;
 	private final FileCache fileCache;
 	private final TaskSlotTable taskSlotTable;
+	private final JobManagerTable jobManagerTable;
+	private final JobLeaderService jobLeaderService;
 
 	private TaskManagerServices(
 		TaskManagerLocation taskManagerLocation,
@@ -83,7 +85,9 @@ public class TaskManagerServices {
 		TaskManagerMetricGroup taskManagerMetricGroup,
 		BroadcastVariableManager broadcastVariableManager,
 		FileCache fileCache,
-		TaskSlotTable taskSlotTable) {
+		TaskSlotTable taskSlotTable,
+		JobManagerTable jobManagerTable,
+		JobLeaderService jobLeaderService) {
 
 		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
 		this.memoryManager = Preconditions.checkNotNull(memoryManager);
@@ -94,6 +98,8 @@ public class TaskManagerServices {
 		this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager);
 		this.fileCache = Preconditions.checkNotNull(fileCache);
 		this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
+		this.jobManagerTable = Preconditions.checkNotNull(jobManagerTable);
+		this.jobLeaderService = Preconditions.checkNotNull(jobLeaderService);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -136,6 +142,14 @@ public class TaskManagerServices {
 		return taskSlotTable;
 	}
 
+	public JobManagerTable getJobManagerTable() {
+		return jobManagerTable;
+	}
+
+	public JobLeaderService getJobLeaderService() {
+		return jobLeaderService;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Static factory methods for task manager services
 	// --------------------------------------------------------------------------------------------
@@ -190,6 +204,10 @@ public class TaskManagerServices {
 		final TimerService<AllocationID> timerService = new TimerService<>(new ScheduledThreadPoolExecutor(1));
 
 		final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);
+
+		final JobManagerTable jobManagerTable = new JobManagerTable();
+
+		final JobLeaderService jobLeaderService = new JobLeaderService(resourceID);
 		
 		return new TaskManagerServices(
 			taskManagerLocation,
@@ -200,7 +218,9 @@ public class TaskManagerServices {
 			taskManagerMetricGroup,
 			broadcastVariableManager,
 			fileCache,
-			taskSlotTable);
+			taskSlotTable,
+			jobManagerTable,
+			jobLeaderService);
 	}
 
 	/**