You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/10 12:46:07 UTC

[3/6] flink git commit: [FLINK-8389] [flip6] Release all slots upon closing of JobManager

[FLINK-8389] [flip6] Release all slots upon closing of JobManager

This closes #5265.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9541afd2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9541afd2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9541afd2

Branch: refs/heads/master
Commit: 9541afd2c53fc55cee7f0f45b4e16377803f1388
Parents: ff67094
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 1 15:02:09 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 13:45:19 2018 +0100

----------------------------------------------------------------------
 .../slots/ActorTaskManagerGateway.java          |   6 +
 .../jobmanager/slots/TaskManagerGateway.java    |  15 ++
 .../flink/runtime/jobmaster/JobMaster.java      |  90 +++++++----
 .../runtime/jobmaster/JobMasterGateway.java     |   3 +-
 .../jobmaster/RpcTaskManagerGateway.java        |   9 ++
 .../runtime/jobmaster/slotpool/SlotPool.java    | 126 ++++++++++-----
 .../jobmaster/slotpool/SlotPoolGateway.java     |   4 +-
 .../runtime/taskexecutor/JobLeaderService.java  |   3 +-
 .../runtime/taskexecutor/TaskExecutor.java      | 137 +++++++----------
 .../taskexecutor/TaskExecutorGateway.java       |  13 ++
 .../runtime/taskexecutor/slot/TaskSlot.java     |  30 ++--
 .../taskexecutor/slot/TaskSlotTable.java        |  26 ++--
 .../runtime/taskexecutor/slot/TimerService.java |   9 +-
 .../ExecutionGraphRestartTest.java              |  14 +-
 .../utils/SimpleAckingTaskManagerGateway.java   |  39 +++--
 .../scheduler/DummyScheduledUnit.java           |  34 +++++
 .../jobmaster/slotpool/SlotPoolTest.java        | 152 +++++++++++++------
 .../TestingTaskExecutorGateway.java             |   5 +
 .../src/test/resources/log4j-test.properties    |  38 +++++
 19 files changed, 498 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
index dc5d8c0..6b88752 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -218,6 +219,11 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
 		return requestTaskManagerLog((TaskManagerMessages.RequestTaskManagerLog) TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
 	}
 
+	@Override
+	public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
+		throw new UnsupportedOperationException("The old TaskManager does not support freeing slots");
+	}
+
 	private CompletableFuture<TransientBlobKey> requestTaskManagerLog(TaskManagerMessages.RequestTaskManagerLog request, Time timeout) {
 		Preconditions.checkNotNull(request);
 		Preconditions.checkNotNull(timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index 682441a..b2aca32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTrace;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import org.apache.flink.runtime.rpc.RpcTimeout;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -186,4 +188,17 @@ public interface TaskManagerGateway {
 	 * @return Future blob key under which the task manager stdout file has been stored
 	 */
 	CompletableFuture<TransientBlobKey> requestTaskManagerStdout(final Time timeout);
+
+	/**
+	 * Frees the slot with the given allocation ID.
+	 *
+	 * @param allocationId identifying the slot to free
+	 * @param cause of the freeing operation
+	 * @param timeout for the operation
+	 * @return Future acknowledge which is returned once the slot has been freed
+	 */
+	CompletableFuture<Acknowledge> freeSlot(
+		final AllocationID allocationId,
+		final Throwable cause,
+		@RpcTimeout final Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 7a2844d..b81a8c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -55,8 +55,6 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -65,6 +63,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -96,6 +96,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
@@ -110,13 +111,16 @@ import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -140,8 +144,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	// ------------------------------------------------------------------------
 
-	private final JobMasterGateway selfGateway;
-
 	private final ResourceID resourceId;
 
 	/** Logical representation of the job. */
@@ -226,9 +228,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			@Nullable String restAddress,
 			@Nullable String metricQueryServicePath) throws Exception {
 
-		super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME));
+		super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME));
 
-		selfGateway = getSelfGateway(JobMasterGateway.class);
+		final JobMasterGateway selfGateway = getSelfGateway(JobMasterGateway.class);
 
 		this.resourceId = checkNotNull(resourceId);
 		this.jobGraph = checkNotNull(jobGraph);
@@ -362,13 +364,47 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	 */
 	@Override
 	public void postStop() throws Exception {
+		log.info("Stopping the JobMaster for job " + jobGraph.getName() + '(' + jobGraph.getJobID() + ").");
+
+		// disconnect from all registered TaskExecutors
+		final Set<ResourceID> taskManagerResourceIds = new HashSet<>(registeredTaskManagers.keySet());
+		final FlinkException cause = new FlinkException("Stopping JobMaster for job " + jobGraph.getName() +
+			'(' + jobGraph.getJobID() + ").");
+
+		for (ResourceID taskManagerResourceId : taskManagerResourceIds) {
+			disconnectTaskManager(taskManagerResourceId, cause);
+		}
+
 		taskManagerHeartbeatManager.stop();
 		resourceManagerHeartbeatManager.stop();
 
 		// make sure there is a graceful exit
 		suspendExecution(new Exception("JobManager is shutting down."));
 
-		super.postStop();
+		// shut down will internally release all registered slots
+		slotPool.shutDown();
+		CompletableFuture<Boolean> terminationFuture = slotPool.getTerminationFuture();
+
+		Exception exception = null;
+
+		// wait for the slot pool shut down
+		try {
+			terminationFuture.get(rpcTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		} catch (Exception e) {
+			exception = e;
+		}
+
+		try {
+			super.postStop();
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
+		}
+
+		if (exception != null) {
+			throw exception;
+		}
+
+		log.info("Stopped the JobMaster for job " + jobGraph.getName() + '(' + jobGraph.getJobID() + ").");
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -507,9 +543,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	@Override
-	public void disconnectTaskManager(final ResourceID resourceID, final Exception cause) {
+	public CompletableFuture<Acknowledge> disconnectTaskManager(final ResourceID resourceID, final Exception cause) {
 		taskManagerHeartbeatManager.unmonitorTarget(resourceID);
-		slotPoolGateway.releaseTaskManager(resourceID);
+		CompletableFuture<Acknowledge> releaseFuture = slotPoolGateway.releaseTaskManager(resourceID);
 
 		Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection = registeredTaskManagers.remove(resourceID);
 
@@ -517,6 +553,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			taskManagerConnection.f1.disconnectJobManager(jobGraph.getJobID(), cause);
 		}
 
+		return releaseFuture;
 	}
 
 	// TODO: This method needs a leader session ID
@@ -537,14 +574,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			checkpointState);
 
 		if (checkpointCoordinator != null) {
-			getRpcService().execute(new Runnable() {
-				@Override
-				public void run() {
-					try {
-						checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
-					} catch (Throwable t) {
-						log.warn("Error while processing checkpoint acknowledgement message");
-					}
+			getRpcService().execute(() -> {
+				try {
+					checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
+				} catch (Throwable t) {
+					log.warn("Error while processing checkpoint acknowledgement message");
 				}
 			});
 		} else {
@@ -1138,12 +1172,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		@Override
 		protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					resourceManagerResourceID = success.getResourceManagerResourceId();
-					establishResourceManagerConnection(success);
-				}
+			runAsync(() -> {
+				resourceManagerResourceID = success.getResourceManagerResourceId();
+				establishResourceManagerConnection(success);
 			});
 		}
 
@@ -1209,15 +1240,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		@Override
 		public void notifyHeartbeatTimeout(final ResourceID resourceId) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
+			runAsync(() -> {
+				log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
 
-					closeResourceManagerConnection(
-						new TimeoutException(
-							"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
-				}
+				closeResourceManagerConnection(
+					new TimeoutException(
+						"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
 			});
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 09d995e..0d896ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -131,8 +131,9 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
 	 *
 	 * @param resourceID identifying the TaskManager to disconnect
 	 * @param cause for the disconnection of the TaskManager
+	 * @return Future acknowledge once the JobMaster has been disconnected from the TaskManager
 	 */
-	void disconnectTaskManager(ResourceID resourceID, Exception cause);
+	CompletableFuture<Acknowledge> disconnectTaskManager(ResourceID resourceID, Exception cause);
 
 	/**
 	 * Disconnects the resource manager from the job manager because of the given cause.

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index 849a163..83b8999 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -132,4 +133,12 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
 //		return taskExecutorGateway.requestTaskManagerStdout(timeout);
 		throw new UnsupportedOperationException("Operation is not yet supported.");
 	}
+
+	@Override
+	public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
+		return taskExecutorGateway.freeSlot(
+			allocationId,
+			cause,
+			timeout);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 996e445..a56335c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -51,9 +51,6 @@ import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import javax.annotation.Nullable;
 
 import java.util.Collection;
@@ -89,16 +86,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedSlotActions {
 
-	/** The log for the pool - shared also with the internal classes. */
-	static final Logger LOG = LoggerFactory.getLogger(SlotPool.class);
-
 	// ------------------------------------------------------------------------
 
 	private static final Time DEFAULT_SLOT_REQUEST_TIMEOUT = Time.minutes(5);
 
 	private static final Time DEFAULT_RM_ALLOCATION_TIMEOUT = Time.minutes(10);
 
-	private static final Time DEFAULT_RM_REQUEST_TIMEOUT = Time.seconds(10);
+	private static final Time DEFAULT_TIMEOUT = Time.seconds(10);
 
 	// ------------------------------------------------------------------------
 
@@ -121,8 +115,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	/** The requests that are waiting for the resource manager to be connected. */
 	private final HashMap<SlotRequestId, PendingRequest> waitingForResourceManager;
 
-	/** Timeout for request calls to the ResourceManager. */
-	private final Time resourceManagerRequestsTimeout;
+	/** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor). */
+	private final Time timeout;
 
 	/** Timeout for allocation round trips (RM -> launch TM -> offer slot). */
 	private final Time resourceManagerAllocationTimeout;
@@ -143,8 +137,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	// ------------------------------------------------------------------------
 
 	public SlotPool(RpcService rpcService, JobID jobId) {
-		this(rpcService, jobId, SystemClock.getInstance(),
-				DEFAULT_SLOT_REQUEST_TIMEOUT, DEFAULT_RM_ALLOCATION_TIMEOUT, DEFAULT_RM_REQUEST_TIMEOUT);
+		this(
+			rpcService,
+			jobId,
+			SystemClock.getInstance(),
+			DEFAULT_SLOT_REQUEST_TIMEOUT,
+			DEFAULT_RM_ALLOCATION_TIMEOUT,
+			DEFAULT_TIMEOUT);
 	}
 
 	public SlotPool(
@@ -159,10 +158,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 
 		this.jobId = checkNotNull(jobId);
 		this.clock = checkNotNull(clock);
-		this.resourceManagerRequestsTimeout = checkNotNull(resourceManagerRequestTimeout);
+		this.timeout = checkNotNull(resourceManagerRequestTimeout);
 		this.resourceManagerAllocationTimeout = checkNotNull(resourceManagerAllocationTimeout);
 
-		this.registeredTaskManagers = new HashSet<>();
+		this.registeredTaskManagers = new HashSet<>(16);
 		this.allocatedSlots = new AllocatedSlots();
 		this.availableSlots = new AvailableSlots();
 		this.pendingRequests = new DualKeyMap<>(16);
@@ -204,6 +203,25 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		}
 	}
 
+	@Override
+	public void postStop() throws Exception {
+		// cancel all pending allocations
+		Set<AllocationID> allocationIds = pendingRequests.keySetB();
+
+		for (AllocationID allocationId : allocationIds) {
+			resourceManagerGateway.cancelSlotRequest(allocationId);
+		}
+
+		// release all registered slots by releasing the corresponding TaskExecutors
+		for (ResourceID taskManagerResourceId : registeredTaskManagers) {
+			releaseTaskManagerInternal(taskManagerResourceId);
+		}
+
+		clear();
+
+		super.postStop();
+	}
+
 	/**
 	 * Suspends this pool, meaning it has lost its authority to accept and distribute slots.
 	 */
@@ -220,9 +238,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 
 		// Clear (but not release!) the available slots. The TaskManagers should re-register them
 		// at the new leader JobManager/SlotPool
-		availableSlots.clear();
-		allocatedSlots.clear();
-		pendingRequests.clear();
+		clear();
 	}
 
 	// ------------------------------------------------------------------------
@@ -644,7 +660,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		Preconditions.checkNotNull(resourceManagerGateway);
 		Preconditions.checkNotNull(pendingRequest);
 
-		LOG.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId());
+		log.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId());
 
 		final AllocationID allocationId = new AllocationID();
 
@@ -660,7 +676,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
 			jobMasterId,
 			new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
-			resourceManagerRequestsTimeout);
+			timeout);
 
 		CompletableFuture<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync(
 			(Acknowledge value) -> {
@@ -695,8 +711,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 			request.getAllocatedSlotFuture().completeExceptionally(new NoResourceAvailableException(
 					"No pooled slot available and request to ResourceManager for new slot failed", failure));
 		} else {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Unregistered slot request {} failed.", slotRequestID, failure);
+			if (log.isDebugEnabled()) {
+				log.debug("Unregistered slot request {} failed.", slotRequestID, failure);
 			}
 		}
 	}
@@ -710,7 +726,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 
 	private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) {
 
-		LOG.info("Cannot serve slot request, no ResourceManager connected. " +
+		log.info("Cannot serve slot request, no ResourceManager connected. " +
 				"Adding as pending request {}",  pendingRequest.getSlotRequestId());
 
 		waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
@@ -720,7 +736,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 			public void run() {
 				checkTimeoutRequestWaitingForResourceManager(pendingRequest.getSlotRequestId());
 			}
-		}, resourceManagerRequestsTimeout);
+		}, timeout);
 	}
 
 	private void checkTimeoutRequestWaitingForResourceManager(SlotRequestId slotRequestId) {
@@ -802,7 +818,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		Preconditions.checkNotNull(e);
 
 		if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
-			LOG.info("Failing pending request {}.", pendingRequest.getSlotRequestId());
+			log.info("Failing pending request {}.", pendingRequest.getSlotRequestId());
 			pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
 		}
 	}
@@ -833,13 +849,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
 
 		if (pendingRequest != null) {
-			LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
+			log.debug("Fulfilling pending request [{}] early with returned slot [{}]",
 				pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
 
 			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
 			pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
 		} else {
-			LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
+			log.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
 			availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
 		}
 	}
@@ -932,14 +948,14 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		final AllocationID allocationID = slotOffer.getAllocationId();
 
 		if (!registeredTaskManagers.contains(resourceID)) {
-			LOG.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
+			log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
 					slotOffer.getAllocationId(), taskManagerLocation);
 			return CompletableFuture.completedFuture(false);
 		}
 
 		// check whether we have already using this slot
 		if (allocatedSlots.contains(allocationID) || availableSlots.contains(allocationID)) {
-			LOG.debug("Received repeated offer for slot [{}]. Ignoring.", allocationID);
+			log.debug("Received repeated offer for slot [{}]. Ignoring.", allocationID);
 
 			// return true here so that the sender will get a positive acknowledgement to the retry
 			// and mark the offering as a success
@@ -1003,7 +1019,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 			failPendingRequest(pendingRequest, cause);
 		}
 		else if (availableSlots.tryRemove(allocationID)) {
-			LOG.debug("Failed available slot [{}] with ", allocationID, cause);
+			log.debug("Failed available slot [{}] with ", allocationID, cause);
 		}
 		else {
 			AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID);
@@ -1013,7 +1029,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 				allocatedSlot.releasePayload(cause);
 			}
 			else {
-				LOG.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
+				log.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
 			}
 		}
 		// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
@@ -1041,21 +1057,48 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	 * Unregister TaskManager from this pool, all the related slots will be released and tasks be canceled. Called
 	 * when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore.
 	 *
-	 * @param resourceID The id of the TaskManager
+	 * @param resourceId The id of the TaskManager
 	 */
 	@Override
-	public CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceID) {
-		if (registeredTaskManagers.remove(resourceID)) {
-			availableSlots.removeAllForTaskManager(resourceID);
+	public CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId) {
+		if (registeredTaskManagers.remove(resourceId)) {
+			releaseTaskManagerInternal(resourceId);
+		}
 
-			final Set<AllocatedSlot> allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID);
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
 
-			for (AllocatedSlot allocatedSlot : allocatedSlotsForResource) {
-				allocatedSlot.releasePayload(new FlinkException("TaskManager " + resourceID + " was released."));
-			}
+	// ------------------------------------------------------------------------
+	//  Internal methods
+	// ------------------------------------------------------------------------
+
+	private void releaseTaskManagerInternal(final ResourceID resourceId) {
+		final FlinkException cause = new FlinkException("Releasing TaskManager " + resourceId + '.');
+
+		final Set<AllocatedSlot> removedSlots = new HashSet<>(allocatedSlots.removeSlotsForTaskManager(resourceId));
+
+		for (AllocatedSlot allocatedSlot : removedSlots) {
+			allocatedSlot.releasePayload(cause);
 		}
 
-		return CompletableFuture.completedFuture(Acknowledge.get());
+		removedSlots.addAll(availableSlots.removeAllForTaskManager(resourceId));
+
+		for (AllocatedSlot removedSlot : removedSlots) {
+			TaskManagerGateway taskManagerGateway = removedSlot.getTaskManagerGateway();
+			taskManagerGateway.freeSlot(removedSlot.getAllocationId(), cause, timeout);
+		}
+	}
+
+	/**
+	 * Clear the internal state of the SlotPool.
+	 */
+	private void clear() {
+		availableSlots.clear();
+		allocatedSlots.clear();
+		pendingRequests.clear();
+		waitingForResourceManager.clear();
+		registeredTaskManagers.clear();
+		slotSharingManagers.clear();
 	}
 
 	// ------------------------------------------------------------------------
@@ -1363,8 +1406,9 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		 * Remove all available slots come from specified TaskManager.
 		 *
 		 * @param taskManager The id of the TaskManager
+		 * @return The set of removed slots for the given TaskManager
 		 */
-		void removeAllForTaskManager(final ResourceID taskManager) {
+		Set<AllocatedSlot> removeAllForTaskManager(final ResourceID taskManager) {
 			// remove from the by-TaskManager view
 			final Set<AllocatedSlot> slotsForTm = availableSlotsByTaskManager.remove(taskManager);
 
@@ -1381,6 +1425,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 				if (slotsForHost.isEmpty()) {
 					availableSlotsByHost.remove(host);
 				}
+
+				return slotsForTm;
+			} else {
+				return Collections.emptySet();
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
index 7a627b4..0df6262 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
@@ -84,10 +84,10 @@ public interface SlotPoolGateway extends AllocatedSlotActions, RpcGateway {
 	/**
 	 * Releases a TaskExecutor with the given {@link ResourceID} from the {@link SlotPool}.
 	 *
-	 * @param resourceID identifying the TaskExecutor which shall be released from the SlotPool
+	 * @param resourceId identifying the TaskExecutor which shall be released from the SlotPool
 	 * @return Future acknowledge which is completed after the TaskExecutor has been released
 	 */
-	CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID);
+	CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId);
 
 	/**
 	 * Offers a slot to the {@link SlotPool}. The slot offer can be accepted or

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 20dcfa9..77737e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -356,8 +356,7 @@ public class JobLeaderService {
 	 * Retrying registration for the job manager <--> task manager connection.
 	 */
 	private static final class JobManagerRetryingRegistration
-			extends RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess>
-	{
+			extends RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> {
 
 		private final String taskManagerRpcAddress;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index a348948..5577472 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -113,34 +113,34 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 	public static final String TASK_MANAGER_NAME = "taskmanager";
 
-	/** The connection information of this task manager */
+	/** The connection information of this task manager. */
 	private final TaskManagerLocation taskManagerLocation;
 
-	/** Max blob port which is accepted */
+	/** Max blob port which is accepted. */
 	public static final int MAX_BLOB_PORT = 65536;
 
-	/** The access to the leader election and retrieval services */
+	/** The access to the leader election and retrieval services. */
 	private final HighAvailabilityServices haServices;
 
-	/** The task manager configuration */
+	/** The task manager configuration. */
 	private final TaskManagerConfiguration taskManagerConfiguration;
 
-	/** The I/O manager component in the task manager */
+	/** The I/O manager component in the task manager. */
 	private final IOManager ioManager;
 
-	/** The memory manager component in the task manager */
+	/** The memory manager component in the task manager. */
 	private final MemoryManager memoryManager;
 
-	/** The network component in the task manager */
+	/** The network component in the task manager. */
 	private final NetworkEnvironment networkEnvironment;
 
-	/** The heartbeat manager for job manager in the task manager */
+	/** The heartbeat manager for job manager in the task manager. */
 	private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
 
-	/** The heartbeat manager for resource manager in the task manager */
+	/** The heartbeat manager for resource manager in the task manager. */
 	private final HeartbeatManager<Void, SlotReport> resourceManagerHeartbeatManager;
 
-	/** The fatal error handler to use in case of a fatal error */
+	/** The fatal error handler to use in case of a fatal error. */
 	private final FatalErrorHandler fatalErrorHandler;
 
 	private final TaskManagerMetricGroup taskManagerMetricGroup;
@@ -673,6 +673,13 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
+	@Override
+	public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
+		freeSlotInternal(allocationId, cause);
+
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
 	// ----------------------------------------------------------------------
 	// Disconnection RPCs
 	// ----------------------------------------------------------------------
@@ -820,7 +827,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 								// We encountered an exception. Free the slots and return them to the RM.
 								for (SlotOffer reservedSlot: reservedSlots) {
-									freeSlot(reservedSlot.getAllocationId(), throwable);
+									freeSlotInternal(reservedSlot.getAllocationId(), throwable);
 								}
 							}
 						} else {
@@ -834,7 +841,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 								final Exception e = new Exception("The slot was rejected by the JobManager.");
 
 								for (SlotOffer rejectedSlot : reservedSlots) {
-									freeSlot(rejectedSlot.getAllocationId(), e);
+									freeSlotInternal(rejectedSlot.getAllocationId(), e);
 								}
 							} else {
 								// discard the response since there is a new leader for the job
@@ -898,20 +905,24 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		// 1. fail tasks running under this JobID
 		Iterator<Task> tasks = taskSlotTable.getTasks(jobId);
 
+		final FlinkException failureCause = new FlinkException("JobManager responsible for " + jobId +
+			" lost the leadership.", cause);
+
 		while (tasks.hasNext()) {
-			tasks.next().failExternally(new Exception("JobManager responsible for " + jobId +
-				" lost the leadership."));
+			tasks.next().failExternally(failureCause);
 		}
 
 		// 2. Move the active slots to state allocated (possible to time out again)
 		Iterator<AllocationID> activeSlots = taskSlotTable.getActiveSlots(jobId);
 
+		final FlinkException freeingCause = new FlinkException("Slot could not be marked inactive.");
+
 		while (activeSlots.hasNext()) {
 			AllocationID activeSlot = activeSlots.next();
 
 			try {
 				if (!taskSlotTable.markSlotInactive(activeSlot, taskManagerConfiguration.getTimeout())) {
-					freeSlot(activeSlot, new Exception("Slot could not be marked inactive."));
+					freeSlotInternal(activeSlot, freeingCause);
 				}
 			} catch (SlotNotFoundException e) {
 				log.debug("Could not mark the slot {} inactive.", jobId, e);
@@ -1017,8 +1028,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 	private void updateTaskExecutionState(
 			final JobMasterGateway jobMasterGateway,
-			final TaskExecutionState taskExecutionState)
-	{
+			final TaskExecutionState taskExecutionState) {
 		final ExecutionAttemptID executionAttemptID = taskExecutionState.getID();
 
 		CompletableFuture<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(taskExecutionState);
@@ -1065,7 +1075,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		}
 	}
 
-	private void freeSlot(AllocationID allocationId, Throwable cause) {
+	private void freeSlotInternal(AllocationID allocationId, Throwable cause) {
 		Preconditions.checkNotNull(allocationId);
 
 		try {
@@ -1085,8 +1095,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		}
 	}
 
-	private void freeSlot(AllocationID allocationId) {
-		freeSlot(allocationId, new Exception("The slot " + allocationId + " is beeing freed."));
+	private void freeSlotInternal(AllocationID allocationId) {
+		freeSlotInternal(allocationId, new Exception("The slot " + allocationId + " is being freed."));
 	}
 
 	private void timeoutSlot(AllocationID allocationId, UUID ticket) {
@@ -1094,7 +1104,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		Preconditions.checkNotNull(ticket);
 
 		if (taskSlotTable.isValidTimeout(allocationId, ticket)) {
-			freeSlot(allocationId, new Exception("The slot " + allocationId + " has timed out."));
+			freeSlotInternal(allocationId, new Exception("The slot " + allocationId + " has timed out."));
 		} else {
 			log.debug("Received an invalid timeout for allocation id {} with ticket {}.", allocationId, ticket);
 		}
@@ -1196,14 +1206,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		public void jobManagerLostLeadership(final JobID jobId, final JobMasterId jobMasterId) {
 			log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobMasterId);
 
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					closeJobManagerConnection(
-						jobId,
-						new Exception("Job leader for job id " + jobId + " lost leadership."));
-				}
-			});
+			runAsync(() ->
+				closeJobManagerConnection(
+					jobId,
+					new Exception("Job leader for job id " + jobId + " lost leadership.")));
 		}
 
 		@Override
@@ -1219,12 +1225,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			final ResourceID resourceManagerId = success.getResourceManagerId();
 
 			runAsync(
-				new Runnable() {
-					@Override
-					public void run() {
-						establishResourceManagerConnection(resourceManagerId);
-					}
-				}
+				() -> establishResourceManagerConnection(resourceManagerId)
 			);
 		}
 
@@ -1243,12 +1244,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		@Override
 		public void notifyFinalState(final ExecutionAttemptID executionAttemptID) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					unregisterTaskAndNotifyFinalState(jobMasterGateway, executionAttemptID);
-				}
-			});
+			runAsync(() -> unregisterTaskAndNotifyFinalState(jobMasterGateway, executionAttemptID));
 		}
 
 		@Override
@@ -1263,12 +1259,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		@Override
 		public void failTask(final ExecutionAttemptID executionAttemptID, final Throwable cause) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					TaskExecutor.this.failTask(executionAttemptID, cause);
-				}
-			});
+			runAsync(() -> TaskExecutor.this.failTask(executionAttemptID, cause));
 		}
 
 		@Override
@@ -1281,22 +1272,12 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		@Override
 		public void freeSlot(final AllocationID allocationId) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					TaskExecutor.this.freeSlot(allocationId);
-				}
-			});
+			runAsync(() -> TaskExecutor.this.freeSlotInternal(allocationId));
 		}
 
 		@Override
 		public void timeoutSlot(final AllocationID allocationId, final UUID ticket) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					TaskExecutor.this.timeoutSlot(allocationId, ticket);
-				}
-			});
+			runAsync(() -> TaskExecutor.this.timeoutSlot(allocationId, ticket));
 		}
 	}
 
@@ -1304,19 +1285,16 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		@Override
 		public void notifyHeartbeatTimeout(final ResourceID resourceID) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("The heartbeat of JobManager with id {} timed out.", resourceID);
-
-					if (jobManagerConnections.containsKey(resourceID)) {
-						JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
-
-						if (jobManagerConnection != null) {
-							closeJobManagerConnection(
-								jobManagerConnection.getJobID(),
-								new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
-						}
+			runAsync(() -> {
+				log.info("The heartbeat of JobManager with id {} timed out.", resourceID);
+
+				if (jobManagerConnections.containsKey(resourceID)) {
+					JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
+
+					if (jobManagerConnection != null) {
+						closeJobManagerConnection(
+							jobManagerConnection.getJobID(),
+							new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
 					}
 				}
 			});
@@ -1337,15 +1315,12 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		@Override
 		public void notifyHeartbeatTimeout(final ResourceID resourceId) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
-
-					closeResourceManagerConnection(
-						new TimeoutException(
-							"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
-				}
+			runAsync(() -> {
+				log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
+
+				closeResourceManagerConnection(
+					new TimeoutException(
+						"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
 			});
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index ee0f69d..3dc80b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -162,4 +162,17 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param cause for the disconnection from the ResourceManager
 	 */
 	void disconnectResourceManager(Exception cause);
+
+	/**
+	 * Frees the slot with the given allocation ID.
+	 *
+	 * @param allocationId identifying the slot to free
+	 * @param cause of the freeing operation
+	 * @param timeout for the operation
+	 * @return Future acknowledge which is returned once the slot has been freed
+	 */
+	CompletableFuture<Acknowledge> freeSlot(
+		final AllocationID allocationId,
+		final Throwable cause,
+		@RpcTimeout final Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
index 6f5230c..411aa94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -38,34 +38,34 @@ import java.util.Map;
  *     <li>Allocated - The slot has been allocated for a job.</li>
  *     <li>Active - The slot is in active use by a job manager which is the leader of the allocating job.</li>
  * </ul>
- * <p>
- * A task slot can only be allocated if it is in state free. An allocated task slot can transition
+ *
+ * <p>A task slot can only be allocated if it is in state free. An allocated task slot can transition
  * to state active.
- *<p>
- * An active slot allows to add tasks from the respective job and with the correct allocation id.
+ *
+ * <p>An active slot allows to add tasks from the respective job and with the correct allocation id.
  * An active slot can be marked as inactive which sets the state back to allocated.
- * <p>
- * An allocated or active slot can only be freed if it is empty. If it is not empty, then it's state
+ *
+ * <p>An allocated or active slot can only be freed if it is empty. If it is not empty, then it's state
  * can be set to releasing indicating that it can be freed once it becomes empty.
  */
 public class TaskSlot {
 
-	/** Index of the task slot */
+	/** Index of the task slot. */
 	private final int index;
 
-	/** Resource characteristics for this slot */
+	/** Resource characteristics for this slot. */
 	private final ResourceProfile resourceProfile;
 
-	/** Tasks running in this slot */
+	/** Tasks running in this slot. */
 	private final Map<ExecutionAttemptID, Task> tasks;
 
-	/** State of this slot */
+	/** State of this slot. */
 	private TaskSlotState state;
 
-	/** Job id to which the slot has been allocated; null if not allocated */
+	/** Job id to which the slot has been allocated; null if not allocated. */
 	private JobID jobId;
 
-	/** Allocation id of this slot; null if not allocated */
+	/** Allocation id of this slot; null if not allocated. */
 	private AllocationID allocationId;
 
 	TaskSlot(final int index, final ResourceProfile resourceProfile) {
@@ -151,7 +151,7 @@ public class TaskSlot {
 	 * task with the same execution attempt id added to the task slot. In this case, the method
 	 * returns true. Otherwise the task slot is left unchanged and false is returned.
 	 *
-	 * In case that the task slot state is not active an {@link IllegalStateException} is thrown.
+	 * <p>In case that the task slot state is not active an {@link IllegalStateException} is thrown.
 	 * In case that the task's job id and allocation id don't match with the job id and allocation
 	 * id for which the task slot has been allocated, an {@link IllegalArgumentException} is thrown.
 	 *
@@ -199,7 +199,7 @@ public class TaskSlot {
 	 * or is already allocated/active for the given job and allocation id, then the method returns
 	 * true. Otherwise it returns false.
 	 *
-	 * A slot can only be allocated if it's current state is free.
+	 * <p>A slot can only be allocated if it's current state is free.
 	 *
 	 * @param newJobId to allocate the slot for
 	 * @param newAllocationId to identify the slot allocation
@@ -230,7 +230,7 @@ public class TaskSlot {
 	/**
 	 * Mark this slot as active. A slot can only be marked active if it's in state allocated.
 	 *
-	 * The method returns true if the slot was set to active. Otherwise it returns false.
+	 * <p>The method returns true if the slot was set to active. Otherwise it returns false.
 	 *
 	 * @return True if the new state of the slot is active; otherwise false
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 1384336..62101e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,35 +48,35 @@ import java.util.UUID;
 /**
  * Container for multiple {@link TaskSlot} instances. Additionally, it maintains multiple indices
  * for faster access to tasks and sets of allocated slots.
- * <p>
- * The task slot table automatically registers timeouts for allocated slots which cannot be assigned
+ *
+ * <p>The task slot table automatically registers timeouts for allocated slots which cannot be assigned
  * to a job manager.
- * <p>
- * Before the task slot table can be used, it must be started via the {@link #start} method.
+ *
+ * <p>Before the task slot table can be used, it must be started via the {@link #start} method.
  */
 public class TaskSlotTable implements TimeoutListener<AllocationID> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TaskSlotTable.class);
 
-	/** Timer service used to time out allocated slots */
+	/** Timer service used to time out allocated slots. */
 	private final TimerService<AllocationID> timerService;
 
-	/** The list of all task slots */
+	/** The list of all task slots. */
 	private final List<TaskSlot> taskSlots;
 
-	/** Mapping from allocation id to task slot */
+	/** Mapping from allocation id to task slot. */
 	private final Map<AllocationID, TaskSlot> allocationIDTaskSlotMap;
 
-	/** Mapping from execution attempt id to task and task slot */
+	/** Mapping from execution attempt id to task and task slot. */
 	private final Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;
 
-	/** Mapping from job id to allocated slots for a job */
+	/** Mapping from job id to allocated slots for a job. */
 	private final Map<JobID, Set<AllocationID>> slotsPerJob;
 
-	/** Interface for slot actions, such as freeing them or timing them out */
+	/** Interface for slot actions, such as freeing them or timing them out. */
 	private SlotActions slotActions;
 
-	/** Whether the table has been started */
+	/** Whether the table has been started. */
 	private boolean started;
 
 	public TaskSlotTable(
@@ -555,7 +556,6 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 		private final Task task;
 		private final TaskSlot taskSlot;
 
-
 		private TaskSlotMapping(Task task, TaskSlot taskSlot) {
 			this.task = Preconditions.checkNotNull(task);
 			this.taskSlot = Preconditions.checkNotNull(taskSlot);
@@ -675,7 +675,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	}
 
 	/**
-	 * Iterator over all {@link Task} for a given job
+	 * Iterator over all {@link Task} for a given job.
 	 */
 	private final class TaskIterator implements Iterator<Task> {
 		private final Iterator<TaskSlot> taskSlotIterator;

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
index 8ec9a2e..84fb9e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskexecutor.slot;
 
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,16 +40,16 @@ public class TimerService<K> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TimerService.class);
 
-	/** Executor service for the scheduled timeouts */
+	/** Executor service for the scheduled timeouts. */
 	private final ScheduledExecutorService scheduledExecutorService;
 
 	/** Timeout for the shutdown of the service. */
 	private final long shutdownTimeout;
 
-	/** Map of currently active timeouts */
+	/** Map of currently active timeouts. */
 	private final Map<K, Timeout<K>> timeouts;
 
-	/** Listener which is notified about occurring timeouts */
+	/** Listener which is notified about occurring timeouts. */
 	private TimeoutListener<K> timeoutListener;
 
 	public TimerService(
@@ -79,7 +80,7 @@ public class TimerService<K> {
 		scheduledExecutorService.shutdown();
 
 		try {
-			if(!scheduledExecutorService.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
+			if (!scheduledExecutorService.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
 				LOG.debug("The scheduled executor service did not properly terminate. Shutting " +
 					"it down now.");
 				scheduledExecutorService.shutdownNow();

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 2245a8c..ccaed96 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -599,8 +599,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		WaitForTasks waitForTasks = new WaitForTasks(parallelism);
 		WaitForTasks waitForTasksCancelled = new WaitForTasks(parallelism);
-		taskManagerGateway.setCondition(waitForTasks);
-		taskManagerGateway.setCancelCondition(waitForTasksCancelled);
+		taskManagerGateway.setSubmitConsumer(waitForTasks);
+		taskManagerGateway.setCancelConsumer(waitForTasksCancelled);
 
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.scheduleForExecution();
@@ -649,7 +649,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
 
 		WaitForTasks waitForTasksAfterRestart = new WaitForTasks(parallelism);
-		taskManagerGateway.setCondition(waitForTasksAfterRestart);
+		taskManagerGateway.setSubmitConsumer(waitForTasksAfterRestart);
 
 		waitForTasksCancelled.getFuture().get(1000L, TimeUnit.MILLISECONDS);
 
@@ -685,7 +685,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		final ExecutionGraph eg = createSimpleTestGraph(jid, slots, restartStrategy, vertex);
 
 		WaitForTasks waitForTasks = new WaitForTasks(parallelism);
-		taskManagerGateway.setCondition(waitForTasks);
+		taskManagerGateway.setSubmitConsumer(waitForTasks);
 
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.scheduleForExecution();
@@ -699,7 +699,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		assertEquals(JobStatus.FAILING, eg.getState());
 
 		WaitForTasks waitForTasksRestart = new WaitForTasks(parallelism);
-		taskManagerGateway.setCondition(waitForTasksRestart);
+		taskManagerGateway.setSubmitConsumer(waitForTasksRestart);
 
 		completeCancellingForAllVertices(eg);
 		waitUntilJobStatus(eg, JobStatus.RESTARTING, 1000);
@@ -750,7 +750,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 			new JobID(), scheduler, new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0), executor, source, sink);
 
 		WaitForTasks waitForTasks = new WaitForTasks(parallelism * 2);
-		taskManagerGateway.setCondition(waitForTasks);
+		taskManagerGateway.setSubmitConsumer(waitForTasks);
 
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.scheduleForExecution();
@@ -766,7 +766,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		assertEquals(JobStatus.FAILING, eg.getState());
 
 		WaitForTasks waitForTasksAfterRestart = new WaitForTasks(parallelism * 2);
-		taskManagerGateway.setCondition(waitForTasksAfterRestart);
+		taskManagerGateway.setSubmitConsumer(waitForTasksAfterRestart);
 
 		completeCancellingForAllVertices(eg);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 682705a..628f004 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -20,9 +20,11 @@ package org.apache.flink.runtime.executiongraph.utils;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -46,21 +48,27 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 
 	private final String address = UUID.randomUUID().toString();
 
-	private Optional<Consumer<ExecutionAttemptID>> optSubmitCondition;
+	private Optional<Consumer<ExecutionAttemptID>> optSubmitConsumer;
 
-	private Optional<Consumer<ExecutionAttemptID>> optCancelCondition;
+	private Optional<Consumer<ExecutionAttemptID>> optCancelConsumer;
+
+	private volatile Consumer<Tuple2<AllocationID, Throwable>> freeSlotConsumer;
 
 	public SimpleAckingTaskManagerGateway() {
-		optSubmitCondition = Optional.empty();
-		optCancelCondition = Optional.empty();
+		optSubmitConsumer = Optional.empty();
+		optCancelConsumer = Optional.empty();
+	}
+
+	public void setSubmitConsumer(Consumer<ExecutionAttemptID> predicate) {
+		optSubmitConsumer = Optional.of(predicate);
 	}
 
-	public void setCondition(Consumer<ExecutionAttemptID> predicate) {
-		optSubmitCondition = Optional.of(predicate);
+	public void setCancelConsumer(Consumer<ExecutionAttemptID> predicate) {
+		optCancelConsumer = Optional.of(predicate);
 	}
 
-	public void setCancelCondition(Consumer<ExecutionAttemptID> predicate) {
-		optCancelCondition = Optional.of(predicate);
+	public void setFreeSlotConsumer(Consumer<Tuple2<AllocationID, Throwable>> consumer) {
+		freeSlotConsumer = consumer;
 	}
 
 	@Override
@@ -92,7 +100,7 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 
 	@Override
 	public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
-		optSubmitCondition.ifPresent(condition -> condition.accept(tdd.getExecutionAttemptId()));
+		optSubmitConsumer.ifPresent(condition -> condition.accept(tdd.getExecutionAttemptId()));
 		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
@@ -103,7 +111,7 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 
 	@Override
 	public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
-		optCancelCondition.ifPresent(condition -> condition.accept(executionAttemptID));
+		optCancelConsumer.ifPresent(condition -> condition.accept(executionAttemptID));
 		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
@@ -139,4 +147,15 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 	public CompletableFuture<TransientBlobKey> requestTaskManagerStdout(Time timeout) {
 		return FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	}
+
+	@Override
+	public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
+		final Consumer<Tuple2<AllocationID, Throwable>> currentFreeSlotConsumer = freeSlotConsumer;
+
+		if (currentFreeSlotConsumer != null) {
+			currentFreeSlotConsumer.accept(Tuple2.of(allocationId, cause));
+		}
+
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DummyScheduledUnit.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DummyScheduledUnit.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DummyScheduledUnit.java
new file mode 100644
index 0000000..939aece
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DummyScheduledUnit.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+/**
+ * Dummy implementation of {@link ScheduledUnit} for testing purposes.
+ */
+public class DummyScheduledUnit extends ScheduledUnit {
+	public DummyScheduledUnit() {
+		super(
+			null,
+			new JobVertexID(),
+			null,
+			null);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index 707ea00..6a8ef0a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
@@ -44,33 +44,31 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.RETURNS_MOCKS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 public class SlotPoolTest extends TestLogger {
 
@@ -84,7 +82,7 @@ public class SlotPoolTest extends TestLogger {
 
 	private TaskManagerLocation taskManagerLocation;
 
-	private TaskManagerGateway taskManagerGateway;
+	private SimpleAckingTaskManagerGateway taskManagerGateway;
 
 	@Before
 	public void setUp() throws Exception {
@@ -102,7 +100,10 @@ public class SlotPoolTest extends TestLogger {
 
 	@Test
 	public void testAllocateSimpleSlot() throws Exception {
-		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+		CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>();
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
+
 		final SlotPool slotPool = new SlotPool(rpcService, jobId);
 
 		try {
@@ -112,17 +113,14 @@ public class SlotPoolTest extends TestLogger {
 			SlotRequestId requestId = new SlotRequestId();
 			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
 				requestId,
-				mock(ScheduledUnit.class),
+				new DummyScheduledUnit(),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
 				timeout);
 			assertFalse(future.isDone());
 
-			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
-			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+			final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 			final SlotOffer slotOffer = new SlotOffer(
 				slotRequest.getAllocationId(),
@@ -136,13 +134,21 @@ public class SlotPoolTest extends TestLogger {
 			assertTrue(slot.isAlive());
 			assertEquals(taskManagerLocation, slot.getTaskManagerLocation());
 		} finally {
-			slotPool.shutDown();
+			RpcUtils.terminateRpcEndpoint(slotPool, timeout);
 		}
 	}
 
 	@Test
 	public void testAllocationFulfilledByReturnedSlot() throws Exception {
-		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+		final ArrayBlockingQueue<SlotRequest> slotRequestQueue = new ArrayBlockingQueue<>(2);
+
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
+			while (!slotRequestQueue.offer(slotRequest)) {
+				// noop
+			}
+		});
+
 		final SlotPool slotPool = new SlotPool(rpcService, jobId);
 
 		try {
@@ -151,14 +157,14 @@ public class SlotPoolTest extends TestLogger {
 
 			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(
 				new SlotRequestId(),
-				mock(ScheduledUnit.class),
+				new DummyScheduledUnit(),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
 				timeout);
 			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(
 				new SlotRequestId(),
-				mock(ScheduledUnit.class),
+				new DummyScheduledUnit(),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
@@ -167,11 +173,11 @@ public class SlotPoolTest extends TestLogger {
 			assertFalse(future1.isDone());
 			assertFalse(future2.isDone());
 
-			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds()).times(2))
-				.requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+			final List<SlotRequest> slotRequests = new ArrayList<>(2);
 
-			final List<SlotRequest> slotRequests = slotRequestArgumentCaptor.getAllValues();
+			for (int i = 0; i < 2; i++) {
+				slotRequests.add(slotRequestQueue.poll(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
+			}
 
 			final SlotOffer slotOffer = new SlotOffer(
 				slotRequests.get(0).getAllocationId(),
@@ -198,13 +204,16 @@ public class SlotPoolTest extends TestLogger {
 			assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
 			assertEquals(slot1.getAllocationId(), slot2.getAllocationId());
 		} finally {
-			slotPool.shutDown();
+			RpcUtils.terminateRpcEndpoint(slotPool, timeout);
 		}
 	}
 
 	@Test
 	public void testAllocateWithFreeSlot() throws Exception {
-		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+		final CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>();
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
+
 		final SlotPool slotPool = new SlotPool(rpcService, jobId);
 
 		try {
@@ -213,17 +222,14 @@ public class SlotPoolTest extends TestLogger {
 
 			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(
 				new SlotRequestId(),
-				mock(ScheduledUnit.class),
+				new DummyScheduledUnit(),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
 				timeout);
 			assertFalse(future1.isDone());
 
-			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
-			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+			final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 			final SlotOffer slotOffer = new SlotOffer(
 				slotRequest.getAllocationId(),
@@ -240,7 +246,7 @@ public class SlotPoolTest extends TestLogger {
 
 			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(
 				new SlotRequestId(),
-				mock(ScheduledUnit.class),
+				new DummyScheduledUnit(),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
@@ -262,7 +268,11 @@ public class SlotPoolTest extends TestLogger {
 
 	@Test
 	public void testOfferSlot() throws Exception {
-		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+		final CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>();
+
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
+
 		final SlotPool slotPool = new SlotPool(rpcService, jobId);
 
 		try {
@@ -271,17 +281,14 @@ public class SlotPoolTest extends TestLogger {
 
 			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
 				new SlotRequestId(),
-				mock(ScheduledUnit.class),
+				new DummyScheduledUnit(),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
 				timeout);
 			assertFalse(future.isDone());
 
-			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
-			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+			final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 			final SlotOffer slotOffer = new SlotOffer(
 				slotRequest.getAllocationId(),
@@ -320,7 +327,10 @@ public class SlotPoolTest extends TestLogger {
 
 	@Test
 	public void testReleaseResource() throws Exception {
-		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+		final CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>();
+
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
 
 		final CompletableFuture<Boolean> slotReturnFuture = new CompletableFuture<>();
 
@@ -347,20 +357,17 @@ public class SlotPoolTest extends TestLogger {
 
 			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(
 				new SlotRequestId(),
-				mock(ScheduledUnit.class),
+				new DummyScheduledUnit(),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
 				timeout);
 
-			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
-			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+			final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(
 				new SlotRequestId(),
-				mock(ScheduledUnit.class),
+				new DummyScheduledUnit(),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
@@ -527,13 +534,58 @@ public class SlotPoolTest extends TestLogger {
 		}
 	}
 
-	private static ResourceManagerGateway createResourceManagerGatewayMock() {
-		ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
-		when(resourceManagerGateway
-			.requestSlot(any(JobMasterId.class), any(SlotRequest.class), any(Time.class)))
-			.thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
+	/**
+	 * Tests that a SlotPool shutdown releases all registered slots
+	 */
+	@Test
+	public void testShutdownReleasesAllSlots() throws Exception {
+		final SlotPool slotPool = new SlotPool(rpcService, jobId);
+		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+
+		try {
+			final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
+
+			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
+
+			final int numSlotOffers = 2;
+
+			final Collection<SlotOffer> slotOffers = new ArrayList<>(numSlotOffers);
+
+			for (int i = 0; i < numSlotOffers; i++) {
+				slotOffers.add(
+					new SlotOffer(
+						new AllocationID(),
+						i,
+						ResourceProfile.UNKNOWN));
+			}
+
+			final ArrayBlockingQueue<AllocationID> freedSlotQueue = new ArrayBlockingQueue<>(numSlotOffers);
 
-		return resourceManagerGateway;
+			taskManagerGateway.setFreeSlotConsumer(tuple -> {
+				while(!freedSlotQueue.offer(tuple.f0)) {}
+			});
+
+			final CompletableFuture<Collection<SlotOffer>> acceptedSlotOffersFuture = slotPoolGateway.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);
+
+			final Collection<SlotOffer> acceptedSlotOffers = acceptedSlotOffersFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+			assertThat(acceptedSlotOffers, Matchers.equalTo(slotOffers));
+
+			// shut down the slot pool
+			slotPool.shutDown();
+			slotPool.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+			// the shut down operation should have freed all registered slots
+			ArrayList<AllocationID> freedSlots = new ArrayList<>(numSlotOffers);
+
+			while (freedSlots.size() < numSlotOffers) {
+				freedSlotQueue.drainTo(freedSlots);
+			}
+
+			assertThat(freedSlots, Matchers.containsInAnyOrder(slotOffers.stream().map(SlotOffer::getAllocationId).toArray()));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(slotPool, timeout);
+		}
 	}
 
 	private static SlotPoolGateway setupSlotPool(

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 94f325d..c6334c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -113,6 +113,11 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 	}
 
 	@Override
+	public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	@Override
 	public String getAddress() {
 		return address;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-streaming-scala/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/resources/log4j-test.properties b/flink-streaming-scala/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..7ba1633
--- /dev/null
+++ b/flink-streaming-scala/src/test/resources/log4j-test.properties
@@ -0,0 +1,38 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, console
+
+# -----------------------------------------------------------------------------
+# Console (use 'console')
+# -----------------------------------------------------------------------------
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# -----------------------------------------------------------------------------
+# File (use 'file')
+# -----------------------------------------------------------------------------
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.file=${log.dir}/${mvn.forkNumber}.log
+log4j.appender.file.append=false
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console