You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/24 14:49:48 UTC

[4/4] flink git commit: [FLINK-9427] Fix registration and request slot race condition in TaskExecutor

[FLINK-9427] Fix registration and request slot race condition in TaskExecutor

This commit fixes a race condition between the TaskExecutor and the ResourceManager. Before,
it could happen that the ResourceManager sends requestSlots message before the TaskExecutor
registration was completed. Due to this, the TaskExecutor did not have all information it needed
to accept task submissions.

The problem was that the TaskExecutor sent the SlotReport at registration time. Due to this, t
he SlotManager could already assign these slots to pending slot requests. With this commit, the
registration protocol changes such that the TaskExecutor first registers at the ResourceManager
and only after completing this step, it will announce the available slots to the SlotManager.

This closes #6067.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47dc6997
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47dc6997
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47dc6997

Branch: refs/heads/master
Commit: 47dc69970996aa2a57c0dbfde866d3dac6d53001
Parents: a5966fd
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 23 18:50:27 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 24 16:45:15 2018 +0200

----------------------------------------------------------------------
 .../MesosResourceManagerTest.java               |   9 +-
 .../flink/runtime/jobmaster/JobMaster.java      |   3 +-
 .../RegistrationConnectionListener.java         |   5 +-
 .../resourcemanager/ResourceManager.java        |  18 +-
 .../resourcemanager/ResourceManagerGateway.java |  16 +-
 .../EstablishedResourceManagerConnection.java   |  62 +++
 .../runtime/taskexecutor/TaskExecutor.java      | 187 +++++----
 ...TaskExecutorToResourceManagerConnection.java |  56 +--
 .../clusterframework/ResourceManagerTest.java   |   3 -
 .../ResourceManagerTaskExecutorTest.java        |  14 +-
 .../resourcemanager/ResourceManagerTest.java    |   2 -
 .../slotmanager/SlotManagerTest.java            |  63 +++
 .../slotmanager/TestingResourceActions.java     |  44 +++
 .../utils/TestingResourceManagerGateway.java    |  50 ++-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 385 ++++++++++++++-----
 .../TestingTaskExecutorGateway.java             |  10 +-
 .../TestingTaskExecutorGatewayBuilder.java      |  14 +-
 .../flink/yarn/YarnConfigurationITCase.java     |  16 +-
 .../flink/yarn/YarnResourceManagerTest.java     |  17 +-
 19 files changed, 730 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/47dc6997/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 91647c0..56d2fbd 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -665,9 +666,15 @@ public class MesosResourceManagerTest extends TestLogger {
 			final HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
 			// send registration message
 			CompletableFuture<RegistrationResponse> successfulFuture =
-				resourceManager.registerTaskExecutor(task1Executor.address, task1Executor.resourceID, slotReport, dataPort, hardwareDescription, timeout);
+				resourceManager.registerTaskExecutor(task1Executor.address, task1Executor.resourceID, dataPort, hardwareDescription, timeout);
 			RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+			final TaskExecutorRegistrationSuccess registrationResponse = (TaskExecutorRegistrationSuccess) response;
+
+			final CompletableFuture<Acknowledge> initialSlotReportFuture = resourceManager.sendSlotReport(task1Executor.resourceID, registrationResponse.getRegistrationId(), slotReport, timeout);
+
+			// check for errors
+			initialSlotReportFuture.get();
 
 			// verify the internal state
 			assertThat(resourceManager.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));

http://git-wip-us.apache.org/repos/asf/flink/blob/47dc6997/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index aad81a1..f96e0ae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1557,7 +1557,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		@Override
 		protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
 			runAsync(() -> {
-				// filter out replace connections
+				// filter out outdated connections
+				//noinspection ObjectEquality
 				if (this == resourceManagerConnection) {
 					establishResourceManagerConnection(success);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/47dc6997/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java
index 360f982..3eb7243 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java
@@ -22,14 +22,15 @@ package org.apache.flink.runtime.registration;
  * Classes which want to be notified about the registration result by the {@link RegisteredRpcConnection}
  * have to implement this interface.
  */
-public interface RegistrationConnectionListener<Success extends RegistrationResponse.Success> {
+public interface RegistrationConnectionListener<T extends RegisteredRpcConnection<?, ?, S>, S extends RegistrationResponse.Success> {
 
 	/**
 	 * This method is called by the {@link RegisteredRpcConnection} when the registration is success.
 	 *
 	 * @param success The concrete response information for successful registration.
+	 * @param connection The instance which established the connection
 	 */
-	void onRegistrationSuccess(Success success);
+	void onRegistrationSuccess(T connection, S success);
 
 	/**
 	 * This method is called by the {@link RegisteredRpcConnection} when the registration fails.

http://git-wip-us.apache.org/repos/asf/flink/blob/47dc6997/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index af736b2..6e5c824 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -338,7 +338,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	public CompletableFuture<RegistrationResponse> registerTaskExecutor(
 			final String taskExecutorAddress,
 			final ResourceID taskExecutorResourceId,
-			final SlotReport slotReport,
 			final int dataPort,
 			final HardwareDescription hardwareDescription,
 			final Time timeout) {
@@ -354,7 +353,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 						taskExecutorGateway,
 						taskExecutorAddress,
 						taskExecutorResourceId,
-						slotReport,
 						dataPort,
 						hardwareDescription);
 				}
@@ -363,6 +361,18 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	}
 
 	@Override
+	public CompletableFuture<Acknowledge> sendSlotReport(ResourceID taskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReport slotReport, Time timeout) {
+		final WorkerRegistration<WorkerType> workerTypeWorkerRegistration = taskExecutors.get(taskManagerResourceId);
+
+		if (workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)) {
+			slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport);
+			return CompletableFuture.completedFuture(Acknowledge.get());
+		} else {
+			return FutureUtils.completedExceptionally(new ResourceManagerException(String.format("Unknown TaskManager registration id %s.", taskManagerRegistrationId)));
+		}
+	}
+
+	@Override
 	public void heartbeatFromTaskManager(final ResourceID resourceID, final SlotReport slotReport) {
 		taskManagerHeartbeatManager.receiveHeartbeat(resourceID, slotReport);
 	}
@@ -669,7 +679,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	 * @param taskExecutorGateway to communicate with the registering TaskExecutor
 	 * @param taskExecutorAddress address of the TaskExecutor
 	 * @param taskExecutorResourceId ResourceID of the TaskExecutor
-	 * @param slotReport initial slot report from the TaskExecutor
 	 * @param dataPort port used for data transfer
 	 * @param hardwareDescription of the registering TaskExecutor
 	 * @return RegistrationResponse
@@ -678,7 +687,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 			TaskExecutorGateway taskExecutorGateway,
 			String taskExecutorAddress,
 			ResourceID taskExecutorResourceId,
-			SlotReport slotReport,
 			int dataPort,
 			HardwareDescription hardwareDescription) {
 		WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);
@@ -702,8 +710,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 
 			taskExecutors.put(taskExecutorResourceId, registration);
 
-			slotManager.registerTaskManager(registration, slotReport);
-
 			taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
 				@Override
 				public void receiveHeartbeat(ResourceID resourceID, Void payload) {

http://git-wip-us.apache.org/repos/asf/flink/blob/47dc6997/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index bd282d6..ea79650 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -92,7 +92,6 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
 	 *
 	 * @param taskExecutorAddress The address of the TaskExecutor that registers
 	 * @param resourceId The resource ID of the TaskExecutor that registers
-	 * @param slotReport The slot report containing free and allocated task slots
 	 * @param dataPort port used for data communication between TaskExecutors
 	 * @param hardwareDescription of the registering TaskExecutor
 	 * @param timeout The timeout for the response.
@@ -102,12 +101,25 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
 	CompletableFuture<RegistrationResponse> registerTaskExecutor(
 		String taskExecutorAddress,
 		ResourceID resourceId,
-		SlotReport slotReport,
 		int dataPort,
 		HardwareDescription hardwareDescription,
 		@RpcTimeout Time timeout);
 
 	/**
+	 * Sends the given {@link SlotReport} to the ResourceManager.
+	 *
+	 * @param taskManagerRegistrationId id identifying the sending TaskManager
+	 * @param slotReport which is sent to the ResourceManager
+	 * @param timeout for the operation
+	 * @return Future which is completed with {@link Acknowledge} once the slot report has been received.
+	 */
+	CompletableFuture<Acknowledge> sendSlotReport(
+		ResourceID taskManagerResourceId,
+		InstanceID taskManagerRegistrationId,
+		SlotReport slotReport,
+		@RpcTimeout Time timeout);
+
+	/**
 	 * Sent by the TaskExecutor to notify the ResourceManager that a slot has become available.
 	 *
 	 * @param instanceId TaskExecutor's instance id

http://git-wip-us.apache.org/repos/asf/flink/blob/47dc6997/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/EstablishedResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/EstablishedResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/EstablishedResourceManagerConnection.java
new file mode 100644
index 0000000..5fb2c1c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/EstablishedResourceManagerConnection.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Container for the resource manager connection instances used by the
+ * {@link TaskExecutor}.
+ */
+class EstablishedResourceManagerConnection {
+
+	@Nonnull
+	private final ResourceManagerGateway resourceManagerGateway;
+
+	@Nonnull
+	private final ResourceID resourceManagerResourceId;
+
+	@Nonnull
+	private final InstanceID taskExecutorRegistrationId;
+
+	EstablishedResourceManagerConnection(@Nonnull ResourceManagerGateway resourceManagerGateway, @Nonnull ResourceID resourceManagerResourceId, @Nonnull InstanceID taskExecutorRegistrationId) {
+		this.resourceManagerGateway = resourceManagerGateway;
+		this.resourceManagerResourceId = resourceManagerResourceId;
+		this.taskExecutorRegistrationId = taskExecutorRegistrationId;
+	}
+
+	@Nonnull
+	public ResourceManagerGateway getResourceManagerGateway() {
+		return resourceManagerGateway;
+	}
+
+	@Nonnull
+	public ResourceID getResourceManagerResourceId() {
+		return resourceManagerResourceId;
+	}
+
+	@Nonnull
+	public InstanceID getTaskExecutorRegistrationId() {
+		return taskExecutorRegistrationId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47dc6997/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index e87c44d..dda2688 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
@@ -56,6 +57,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.ResourceManagerAddress;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -82,6 +84,7 @@ import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutExcep
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
+import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException;
 import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider;
@@ -167,10 +170,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 	/** The network component in the task manager. */
 	private final NetworkEnvironment networkEnvironment;
 
-	// --------- resource manager --------
-
-	private TaskExecutorToResourceManagerConnection resourceManagerConnection;
-
 	// --------- job manager connections -----------
 
 	private final Map<ResourceID, JobManagerConnection> jobManagerConnections;
@@ -191,6 +190,17 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 	private FileCache fileCache;
 
+	// --------- resource manager --------
+
+	@Nullable
+	private ResourceManagerAddress resourceManagerAddress;
+
+	@Nullable
+	private EstablishedResourceManagerConnection establishedResourceManagerConnection;
+
+	@Nullable
+	private TaskExecutorToResourceManagerConnection resourceManagerConnection;
+
 	@Nullable
 	private UUID currentRegistrationTimeoutId;
 
@@ -242,6 +252,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		this.hardwareDescription = HardwareDescription.extractFromSystem(
 			taskExecutorServices.getMemoryManager().getMemorySize());
 
+		this.resourceManagerAddress = null;
+		this.resourceManagerConnection = null;
 		this.currentRegistrationTimeoutId = null;
 	}
 
@@ -732,19 +744,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			allocationId, jobId, resourceManagerId);
 
 		try {
-			if (resourceManagerConnection == null) {
-				final String message = "TaskManager is not connected to a resource manager.";
+			if (!isConnectedToResourceManager(resourceManagerId)) {
+				final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId);
 				log.debug(message);
-				throw new SlotAllocationException(message);
-			}
-
-			if (!Objects.equals(resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) {
-				final String message = "The leader id " + resourceManagerId +
-					" does not match with the leader id of the connected resource manager " +
-					resourceManagerConnection.getTargetLeaderId() + '.';
-
-				log.debug(message);
-				throw new SlotAllocationException(message);
+				throw new TaskManagerException(message);
 			}
 
 			if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
@@ -788,8 +791,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 					throw new SlotAllocationException("Could not add job to job leader service.", e);
 				}
 			}
-		} catch (SlotAllocationException slotAllocationException) {
-			return FutureUtils.completedExceptionally(slotAllocationException);
+		} catch (TaskManagerException taskManagerException) {
+			return FutureUtils.completedExceptionally(taskManagerException);
 		}
 
 		return CompletableFuture.completedFuture(Acknowledge.get());
@@ -855,7 +858,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 	@Override
 	public void disconnectResourceManager(Exception cause) {
-		closeResourceManagerConnection(cause);
+		reconnectToResourceManager(cause);
 	}
 
 	// ======================================================================
@@ -867,56 +870,76 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 	// ------------------------------------------------------------------------
 
 	private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
-		if (resourceManagerConnection != null) {
-			if (newLeaderAddress != null) {
-				// the resource manager switched to a new leader
-				log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
-					resourceManagerConnection.getTargetAddress(), newLeaderAddress);
-			}
-			else {
-				// address null means that the current leader is lost without a new leader being there, yet
-				log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
-					resourceManagerConnection.getTargetAddress());
-			}
+		resourceManagerAddress = createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
+		reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
+	}
 
-			// drop the current connection or connection attempt
-			closeResourceManagerConnection(
-				new FlinkException("New ResourceManager leader found under: " + newLeaderAddress +
-					'(' + newResourceManagerId + ')'));
+	@Nullable
+	private ResourceManagerAddress createResourceManagerAddress(@Nullable String newLeaderAddress, @Nullable ResourceManagerId newResourceManagerId) {
+		if (newLeaderAddress == null) {
+			return null;
+		} else {
+			assert(newResourceManagerId != null);
+			return new ResourceManagerAddress(newLeaderAddress, newResourceManagerId);
 		}
+	}
 
-		// establish a connection to the new leader
-		if (newLeaderAddress != null) {
-			createResourceManagerConnection(newLeaderAddress, newResourceManagerId);
+	private void reconnectToResourceManager(Exception cause) {
+		closeResourceManagerConnection(cause);
+		tryConnectToResourceManager();
+	}
+
+	private void tryConnectToResourceManager() {
+		if (resourceManagerAddress != null) {
+			connectToResourceManager();
 		}
 	}
 
-	private void createResourceManagerConnection(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
-		log.info("Attempting to register at ResourceManager {} ({})", newLeaderAddress, newResourceManagerId);
+	private void connectToResourceManager() {
+		assert(resourceManagerAddress != null);
+		assert(establishedResourceManagerConnection == null);
+		assert(resourceManagerConnection == null);
+
+		log.info("Connecting to ResourceManager {}.", resourceManagerAddress);
+
 		resourceManagerConnection =
 			new TaskExecutorToResourceManagerConnection(
 				log,
 				getRpcService(),
 				getAddress(),
 				getResourceID(),
-				taskSlotTable.createSlotReport(getResourceID()),
 				taskManagerLocation.dataPort(),
 				hardwareDescription,
-				newLeaderAddress,
-				newResourceManagerId,
+				resourceManagerAddress.getAddress(),
+				resourceManagerAddress.getResourceManagerId(),
 				getMainThreadExecutor(),
 				new ResourceManagerRegistrationListener());
 		resourceManagerConnection.start();
 	}
 
 	private void establishResourceManagerConnection(
+			ResourceManagerGateway resourceManagerGateway,
 			ResourceID resourceManagerResourceId,
+			InstanceID taskExecutorRegistrationId,
 			ClusterInformation clusterInformation) {
+
+		final CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(
+			getResourceID(),
+			taskExecutorRegistrationId,
+			taskSlotTable.createSlotReport(getResourceID()),
+			taskManagerConfiguration.getTimeout());
+
+		slotReportResponseFuture.whenCompleteAsync(
+			(acknowledge, throwable) -> {
+				if (throwable != null) {
+					reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.", throwable));
+				}
+			}, getMainThreadExecutor());
+
 		// monitor the resource manager as heartbeat target
 		resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<SlotReport>() {
 			@Override
 			public void receiveHeartbeat(ResourceID resourceID, SlotReport slotReport) {
-				ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
 				resourceManagerGateway.heartbeatFromTaskManager(resourceID, slotReport);
 			}
 
@@ -933,25 +956,35 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		blobCacheService.setBlobServerAddress(blobServerAddress);
 
+		establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
+			resourceManagerGateway,
+			resourceManagerResourceId,
+			taskExecutorRegistrationId);
+
 		stopRegistrationTimeout();
 	}
 
 	private void closeResourceManagerConnection(Exception cause) {
-		if (resourceManagerConnection != null) {
-
-			if (resourceManagerConnection.isConnected()) {
-				if (log.isDebugEnabled()) {
-					log.debug("Close ResourceManager connection {}.",
-						resourceManagerConnection.getResourceManagerId(), cause);
-				} else {
-					log.info("Close ResourceManager connection {}.",
-						resourceManagerConnection.getResourceManagerId());
-				}
-				resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
+		if (establishedResourceManagerConnection != null) {
+			final ResourceID resourceManagerResourceId = establishedResourceManagerConnection.getResourceManagerResourceId();
 
-				ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
-				resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+			if (log.isDebugEnabled()) {
+				log.debug("Close ResourceManager connection {}.",
+					resourceManagerResourceId, cause);
 			} else {
+				log.info("Close ResourceManager connection {}.",
+					resourceManagerResourceId);
+			}
+			resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceId);
+
+			ResourceManagerGateway resourceManagerGateway = establishedResourceManagerConnection.getResourceManagerGateway();
+			resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+
+			establishedResourceManagerConnection = null;
+		}
+
+		if (resourceManagerConnection != null) {
+			if (!resourceManagerConnection.isConnected()) {
 				if (log.isDebugEnabled()) {
 					log.debug("Terminating registration attempts towards ResourceManager {}.",
 						resourceManagerConnection.getTargetAddress(), cause);
@@ -1327,10 +1360,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 				if (isConnectedToResourceManager()) {
 					// the slot was freed. Tell the RM about it
-					ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+					ResourceManagerGateway resourceManagerGateway = establishedResourceManagerConnection.getResourceManagerGateway();
 
 					resourceManagerGateway.notifySlotAvailable(
-						resourceManagerConnection.getRegistrationId(),
+						establishedResourceManagerConnection.getTaskExecutorRegistrationId(),
 						new SlotID(getResourceID(), slotIndex),
 						allocationId);
 				}
@@ -1375,7 +1408,11 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 	// ------------------------------------------------------------------------
 
 	private boolean isConnectedToResourceManager() {
-		return (resourceManagerConnection != null && resourceManagerConnection.isConnected());
+		return establishedResourceManagerConnection != null;
+	}
+
+	private boolean isConnectedToResourceManager(ResourceManagerId resourceManagerId) {
+		return establishedResourceManagerConnection != null && resourceManagerAddress != null && resourceManagerAddress.getResourceManagerId().equals(resourceManagerId);
 	}
 
 	private boolean isJobManagerConnectionValid(JobID jobId, JobMasterId jobMasterId) {
@@ -1478,16 +1515,27 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		}
 	}
 
-	private final class ResourceManagerRegistrationListener implements RegistrationConnectionListener<TaskExecutorRegistrationSuccess> {
+	private final class ResourceManagerRegistrationListener implements RegistrationConnectionListener<TaskExecutorToResourceManagerConnection, TaskExecutorRegistrationSuccess> {
 
 		@Override
-		public void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
+		public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection connection, TaskExecutorRegistrationSuccess success) {
 			final ResourceID resourceManagerId = success.getResourceManagerId();
+			final InstanceID taskExecutorRegistrationId = success.getRegistrationId();
 			final ClusterInformation clusterInformation = success.getClusterInformation();
+			final ResourceManagerGateway resourceManagerGateway = connection.getTargetGateway();
 
 			runAsync(
-				() -> establishResourceManagerConnection(resourceManagerId, clusterInformation)
-			);
+				() -> {
+					// filter out outdated connections
+					//noinspection ObjectEquality
+					if (resourceManagerConnection == connection) {
+						establishResourceManagerConnection(
+							resourceManagerGateway,
+							resourceManagerId,
+							taskExecutorRegistrationId,
+							clusterInformation);
+					}
+				});
 		}
 
 		@Override
@@ -1598,18 +1646,11 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		public void notifyHeartbeatTimeout(final ResourceID resourceId) {
 			runAsync(() -> {
 				// first check whether the timeout is still valid
-				if (resourceManagerConnection != null && resourceManagerConnection.getResourceManagerId().equals(resourceId)) {
+				if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceId().equals(resourceId)) {
 					log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
 
-					final String resourceManagerAddress = resourceManagerConnection.getTargetAddress();
-					final ResourceManagerId resourceManagerId = resourceManagerConnection.getTargetLeaderId();
-
-					closeResourceManagerConnection(
-						new TimeoutException(
-							"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
-
-					// try to reconnect to old ResourceManager
-					createResourceManagerConnection(resourceManagerAddress, resourceManagerId);
+					reconnectToResourceManager(new TaskManagerException(
+						String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId)));
 				} else {
 					log.debug("Received heartbeat timeout for outdated ResourceManager id {}. Ignoring the timeout.", resourceId);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/47dc6997/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 1288009..9e1de52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -21,16 +21,14 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationConnectionListener;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rpc.RpcService;
 
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
 import java.util.concurrent.CompletableFuture;
@@ -50,43 +48,34 @@ public class TaskExecutorToResourceManagerConnection
 
 	private final ResourceID taskManagerResourceId;
 
-	private final SlotReport slotReport;
-
 	private final int dataPort;
 
 	private final HardwareDescription hardwareDescription;
 
-	private final RegistrationConnectionListener<TaskExecutorRegistrationSuccess> registrationListener;
-
-	private InstanceID registrationId;
-
-	private ResourceID resourceManagerResourceId;
+	private final RegistrationConnectionListener<TaskExecutorToResourceManagerConnection, TaskExecutorRegistrationSuccess> registrationListener;
 
 	public TaskExecutorToResourceManagerConnection(
 			Logger log,
 			RpcService rpcService,
 			String taskManagerAddress,
 			ResourceID taskManagerResourceId,
-			SlotReport slotReport,
 			int dataPort,
 			HardwareDescription hardwareDescription,
 			String resourceManagerAddress,
 			ResourceManagerId resourceManagerId,
 			Executor executor,
-			RegistrationConnectionListener<TaskExecutorRegistrationSuccess> registrationListener) {
+			RegistrationConnectionListener<TaskExecutorToResourceManagerConnection, TaskExecutorRegistrationSuccess> registrationListener) {
 
 		super(log, resourceManagerAddress, resourceManagerId, executor);
 
-		this.rpcService = Preconditions.checkNotNull(rpcService);
-		this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress);
-		this.taskManagerResourceId = Preconditions.checkNotNull(taskManagerResourceId);
-		this.slotReport = Preconditions.checkNotNull(slotReport);
+		this.rpcService = checkNotNull(rpcService);
+		this.taskManagerAddress = checkNotNull(taskManagerAddress);
+		this.taskManagerResourceId = checkNotNull(taskManagerResourceId);
 		this.dataPort = dataPort;
-		this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
-		this.registrationListener = Preconditions.checkNotNull(registrationListener);
+		this.hardwareDescription = checkNotNull(hardwareDescription);
+		this.registrationListener = checkNotNull(registrationListener);
 	}
 
-
 	@Override
 	protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway, TaskExecutorRegistrationSuccess> generateRegistration() {
 		return new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
@@ -96,7 +85,6 @@ public class TaskExecutorToResourceManagerConnection
 			getTargetLeaderId(),
 			taskManagerAddress,
 			taskManagerResourceId,
-			slotReport,
 			dataPort,
 			hardwareDescription);
 	}
@@ -106,9 +94,7 @@ public class TaskExecutorToResourceManagerConnection
 		log.info("Successful registration at resource manager {} under registration id {}.",
 			getTargetAddress(), success.getRegistrationId());
 
-		registrationId = success.getRegistrationId();
-		resourceManagerResourceId = success.getResourceManagerId();
-		registrationListener.onRegistrationSuccess(success);
+		registrationListener.onRegistrationSuccess(this, success);
 	}
 
 	@Override
@@ -118,21 +104,6 @@ public class TaskExecutorToResourceManagerConnection
 		registrationListener.onRegistrationFailure(failure);
 	}
 
-	/**
-	 * Gets the ID under which the TaskExecutor is registered at the ResourceManager.
-	 * This returns null until the registration is completed.
-	 */
-	public InstanceID getRegistrationId() {
-		return registrationId;
-	}
-
-	/**
-	 * Gets the unique id of ResourceManager, that is returned when registration success.
-	 */
-	public ResourceID getResourceManagerId() {
-		return resourceManagerResourceId;
-	}
-
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
@@ -141,10 +112,8 @@ public class TaskExecutorToResourceManagerConnection
 			extends RetryingRegistration<ResourceManagerId, ResourceManagerGateway, TaskExecutorRegistrationSuccess> {
 
 		private final String taskExecutorAddress;
-		
-		private final ResourceID resourceID;
 
-		private final SlotReport slotReport;
+		private final ResourceID resourceID;
 
 		private final int dataPort;
 
@@ -157,14 +126,12 @@ public class TaskExecutorToResourceManagerConnection
 				ResourceManagerId resourceManagerId,
 				String taskExecutorAddress,
 				ResourceID resourceID,
-				SlotReport slotReport,
 				int dataPort,
 				HardwareDescription hardwareDescription) {
 
 			super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, resourceManagerId);
 			this.taskExecutorAddress = checkNotNull(taskExecutorAddress);
 			this.resourceID = checkNotNull(resourceID);
-			this.slotReport = checkNotNull(slotReport);
 			this.dataPort = dataPort;
 			this.hardwareDescription = checkNotNull(hardwareDescription);
 		}
@@ -177,7 +144,6 @@ public class TaskExecutorToResourceManagerConnection
 			return resourceManager.registerTaskExecutor(
 				taskExecutorAddress,
 				resourceID,
-				slotReport,
 				dataPort,
 				hardwareDescription,
 				timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/47dc6997/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index a1b3fb6..8898ef0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -55,7 +55,6 @@ import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -540,12 +539,10 @@ public class ResourceManagerTest extends TestLogger {
 			final UUID rmLeaderSessionId = UUID.randomUUID();
 			rmLeaderElectionService.isLeader(rmLeaderSessionId).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
-			final SlotReport slotReport = new SlotReport();
 			// test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time
 			CompletableFuture<RegistrationResponse> successfulFuture = rmGateway.registerTaskExecutor(
 				taskManagerAddress,
 				taskManagerResourceID,
-				slotReport,
 				dataPort,
 				hardwareDescription,
 				timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/47dc6997/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 58dc6ca..540db69 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -28,20 +28,20 @@ import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
-import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -65,8 +65,6 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
 	private TestingRpcService rpcService;
 
-	private SlotReport slotReport = new SlotReport();
-
 	private int dataPort = 1234;
 
 	private HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
@@ -115,7 +113,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		try {
 			// test response successful
 			CompletableFuture<RegistrationResponse> successfulFuture =
-				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, dataPort, hardwareDescription, timeout);
+				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, timeout);
 			RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
 			final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
@@ -125,7 +123,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
 			// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
 			CompletableFuture<RegistrationResponse> duplicateFuture =
-				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, dataPort, hardwareDescription, timeout);
+				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, timeout);
 			RegistrationResponse duplicateResponse = duplicateFuture.get();
 			assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
 			assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
@@ -144,7 +142,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		try {
 			// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
 			CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
-				wronglyFencedGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, dataPort, hardwareDescription, timeout);
+				wronglyFencedGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, dataPort, hardwareDescription, timeout);
 
 			try {
 				unMatchedLeaderFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
@@ -168,7 +166,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 			// test throw exception when receive a registration from taskExecutor which takes invalid address
 			String invalidAddress = "/taskExecutor2";
 			CompletableFuture<RegistrationResponse> invalidAddressFuture =
-				rmGateway.registerTaskExecutor(invalidAddress, taskExecutorResourceID, slotReport, dataPort, hardwareDescription, timeout);
+				rmGateway.registerTaskExecutor(invalidAddress, taskExecutorResourceID, dataPort, hardwareDescription, timeout);
 			assertTrue(invalidAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS) instanceof RegistrationResponse.Decline);
 		} finally {
 			if (testingFatalErrorHandler.hasExceptionOccurred()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/47dc6997/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 08f5605..f5b6d3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -120,7 +119,6 @@ public class ResourceManagerTest extends TestLogger {
 			CompletableFuture<RegistrationResponse> registrationResponseFuture = resourceManagerGateway.registerTaskExecutor(
 				taskExecutorGateway.getAddress(),
 				taskManagerId,
-				new SlotReport(),
 				dataPort,
 				hardwareDescription,
 				TestingUtils.TIMEOUT());

http://git-wip-us.apache.org/repos/asf/flink/blob/47dc6997/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 56ec12b..af6f3e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -20,12 +20,14 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -40,12 +42,15 @@ import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.util.Arrays;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
@@ -73,6 +78,9 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the {@link SlotManager}.
+ */
 public class SlotManagerTest extends TestLogger {
 
 	/**
@@ -1139,6 +1147,61 @@ public class SlotManagerTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Testst that the SlotManager retries allocating a slot if the TaskExecutor#requestSlot call
+	 * fails.
+	 */
+	@Test
+	public void testSlotRequestFailure() throws Exception {
+		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions())) {
+
+			final SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
+			slotManager.registerSlotRequest(slotRequest);
+
+			final BlockingQueue<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestSlotQueue = new ArrayBlockingQueue<>(1);
+			final BlockingQueue<CompletableFuture<Acknowledge>> responseQueue = new ArrayBlockingQueue<>(1);
+
+			final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+				.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> {
+					requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple5);
+					try {
+						return responseQueue.take();
+					} catch (InterruptedException ignored) {
+						return FutureUtils.completedExceptionally(new FlinkException("Response queue was interrupted."));
+					}
+				})
+				.createTestingTaskExecutorGateway();
+
+			final ResourceID taskExecutorResourceId = ResourceID.generate();
+			final TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, testingTaskExecutorGateway);
+			final SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorResourceId, 0), ResourceProfile.UNKNOWN));
+
+			final CompletableFuture<Acknowledge> firstManualSlotRequestResponse = new CompletableFuture<>();
+			responseQueue.offer(firstManualSlotRequestResponse);
+
+			slotManager.registerTaskManager(taskExecutionConnection, slotReport);
+
+			final Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId> firstRequest = requestSlotQueue.take();
+
+			final CompletableFuture<Acknowledge> secondManualSlotRequestResponse = new CompletableFuture<>();
+			responseQueue.offer(secondManualSlotRequestResponse);
+
+			// fail first request
+			firstManualSlotRequestResponse.completeExceptionally(new SlotAllocationException("Test exception"));
+
+			final Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId> secondRequest = requestSlotQueue.take();
+
+			assertThat(secondRequest.f2, equalTo(firstRequest.f2));
+			assertThat(secondRequest.f0, equalTo(firstRequest.f0));
+
+			secondManualSlotRequestResponse.complete(Acknowledge.get());
+
+			final TaskManagerSlot slot = slotManager.getSlot(secondRequest.f0);
+			assertThat(slot.getState(), equalTo(TaskManagerSlot.State.ALLOCATED));
+			assertThat(slot.getAllocationId(), equalTo(secondRequest.f2));
+		}
+	}
+
 	private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
 		SlotManager slotManager = new SlotManager(
 			TestingUtils.defaultScheduledExecutor(),

http://git-wip-us.apache.org/repos/asf/flink/blob/47dc6997/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
new file mode 100644
index 0000000..915f142
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+
+/**
+ * Testing implementation of the {@link ResourceActions}.
+ */
+public class TestingResourceActions implements ResourceActions {
+	@Override
+	public void releaseResource(InstanceID instanceId, Exception cause) {
+
+	}
+
+	@Override
+	public void allocateResource(ResourceProfile resourceProfile) {
+
+	}
+
+	@Override
+	public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47dc6997/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index 106fd73..daa2383 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.resourcemanager.utils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -50,6 +50,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -78,12 +79,18 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 
 	private volatile Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer;
 
-	private volatile Function<Tuple5<String, ResourceID, SlotReport, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> registerTaskExecutorFunction;
+	private volatile Function<Tuple4<String, ResourceID, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> registerTaskExecutorFunction;
 
 	private volatile Function<Tuple2<ResourceID, FileType>, CompletableFuture<TransientBlobKey>> requestTaskManagerFileUploadFunction;
 
 	private volatile Consumer<Tuple2<ResourceID, Throwable>> disconnectTaskExecutorConsumer;
 
+	private volatile Function<Tuple3<ResourceID, InstanceID, SlotReport>, CompletableFuture<Acknowledge>> sendSlotReportFunction;
+
+	private volatile BiConsumer<ResourceID, SlotReport> taskExecutorHeartbeatConsumer;
+
+	private volatile Consumer<Tuple3<InstanceID, SlotID, AllocationID>> notifySlotAvailableConsumer;
+
 	public TestingResourceManagerGateway() {
 		this(
 			ResourceManagerId.generate(),
@@ -133,7 +140,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 		this.disconnectJobManagerConsumer = disconnectJobManagerConsumer;
 	}
 
-	public void setRegisterTaskExecutorFunction(Function<Tuple5<String, ResourceID, SlotReport, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> registerTaskExecutorFunction) {
+	public void setRegisterTaskExecutorFunction(Function<Tuple4<String, ResourceID, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> registerTaskExecutorFunction) {
 		this.registerTaskExecutorFunction = registerTaskExecutorFunction;
 	}
 
@@ -145,6 +152,18 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 		this.disconnectTaskExecutorConsumer = disconnectTaskExecutorConsumer;
 	}
 
+	public void setSendSlotReportFunction(Function<Tuple3<ResourceID, InstanceID, SlotReport>, CompletableFuture<Acknowledge>> sendSlotReportFunction) {
+		this.sendSlotReportFunction = sendSlotReportFunction;
+	}
+
+	public void setTaskExecutorHeartbeatConsumer(BiConsumer<ResourceID, SlotReport> taskExecutorHeartbeatConsumer) {
+		this.taskExecutorHeartbeatConsumer = taskExecutorHeartbeatConsumer;
+	}
+
+	public void setNotifySlotAvailableConsumer(Consumer<Tuple3<InstanceID, SlotID, AllocationID>> notifySlotAvailableConsumer) {
+		this.notifySlotAvailableConsumer = notifySlotAvailableConsumer;
+	}
+
 	@Override
 	public CompletableFuture<RegistrationResponse> registerJobManager(JobMasterId jobMasterId, ResourceID jobMasterResourceId, String jobMasterAddress, JobID jobId, Time timeout) {
 		final Consumer<Tuple4<JobMasterId, ResourceID, String, JobID>> currentConsumer = registerJobManagerConsumer;
@@ -187,11 +206,22 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 	}
 
 	@Override
-	public CompletableFuture<RegistrationResponse> registerTaskExecutor(String taskExecutorAddress, ResourceID resourceId, SlotReport slotReport, int dataPort, HardwareDescription hardwareDescription, Time timeout) {
-		final Function<Tuple5<String, ResourceID, SlotReport, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> currentFunction = registerTaskExecutorFunction;
+	public CompletableFuture<Acknowledge> sendSlotReport(ResourceID taskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReport slotReport, Time timeout) {
+		final Function<Tuple3<ResourceID, InstanceID, SlotReport>, CompletableFuture<Acknowledge>> currentSendSlotReportFunction = sendSlotReportFunction;
+
+		if (currentSendSlotReportFunction != null) {
+			return currentSendSlotReportFunction.apply(Tuple3.of(taskManagerResourceId, taskManagerRegistrationId, slotReport));
+		} else {
+			return CompletableFuture.completedFuture(Acknowledge.get());
+		}
+	}
+
+	@Override
+	public CompletableFuture<RegistrationResponse> registerTaskExecutor(String taskExecutorAddress, ResourceID resourceId, int dataPort, HardwareDescription hardwareDescription, Time timeout) {
+		final Function<Tuple4<String, ResourceID, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> currentFunction = registerTaskExecutorFunction;
 
 		if (currentFunction != null) {
-			return currentFunction.apply(Tuple5.of(taskExecutorAddress, resourceId, slotReport, dataPort, hardwareDescription));
+			return currentFunction.apply(Tuple4.of(taskExecutorAddress, resourceId, dataPort, hardwareDescription));
 		} else {
 			return CompletableFuture.completedFuture(
 				new TaskExecutorRegistrationSuccess(
@@ -204,7 +234,11 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 
 	@Override
 	public void notifySlotAvailable(InstanceID instanceId, SlotID slotID, AllocationID oldAllocationId) {
+		final Consumer<Tuple3<InstanceID, SlotID, AllocationID>> currentNotifySlotAvailableConsumer = notifySlotAvailableConsumer;
 
+		if (currentNotifySlotAvailableConsumer != null) {
+			currentNotifySlotAvailableConsumer.accept(Tuple3.of(instanceId, slotID, oldAllocationId));
+		}
 	}
 
 	@Override
@@ -229,7 +263,11 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 
 	@Override
 	public void heartbeatFromTaskManager(ResourceID heartbeatOrigin, SlotReport slotReport) {
+		final BiConsumer<ResourceID, SlotReport> currentTaskExecutorHeartbeatConsumer = taskExecutorHeartbeatConsumer;
 
+		if (currentTaskExecutorHeartbeatConsumer != null) {
+			currentTaskExecutorHeartbeatConsumer.accept(heartbeatOrigin, slotReport);
+		}
 	}
 
 	@Override