You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/17 13:52:34 UTC

[flink] 02/02: [FLINK-9912][JM] Release TaskExecutors if they have no slots registered at SlotPool

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e3c98f05d3544d0165c2d97d2d00fcd295cef8c8
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sun Jul 22 22:58:18 2018 +0200

    [FLINK-9912][JM] Release TaskExecutors if they have no slots registered at SlotPool
    
    This commit extends the SlotPools behaviour when failing an allocation by sending a notification
    message to the TaskExecutor about the freed slot. Moreover, it checks whether the affected
    TaskExecutor has more slots registered or not. In the latter case, the TaskExecutor's connection
    will be eagerly closed.
    
    This closes #6394.
---
 .../apache/flink/types/SerializableOptional.java   |  69 +++++++++++
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  17 ++-
 .../flink/runtime/jobmaster/slotpool/SlotPool.java |  57 ++++++---
 .../jobmaster/slotpool/SlotPoolGateway.java        |   4 +-
 .../utils/SimpleAckingTaskManagerGateway.java      |  14 ++-
 .../flink/runtime/jobmaster/JobMasterTest.java     |  94 +++++++++++---
 .../runtime/jobmaster/slotpool/SlotPoolTest.java   | 137 +++++++++++++++++----
 .../taskexecutor/TestingTaskExecutorGateway.java   |   7 +-
 .../TestingTaskExecutorGatewayBuilder.java         |   9 +-
 9 files changed, 347 insertions(+), 61 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
new file mode 100644
index 0000000..4ec75c5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/**
+ * Serializable {@link Optional}.
+ */
+public final class SerializableOptional<T extends Serializable> implements Serializable {
+	private static final long serialVersionUID = -3312769593551775940L;
+
+	private static final SerializableOptional<?> EMPTY = new SerializableOptional<>(null);
+
+	@Nullable
+	private final T value;
+
+	private SerializableOptional(@Nullable T value) {
+		this.value = value;
+	}
+
+	public T get() {
+		if (value == null) {
+			throw new NoSuchElementException("No value present");
+		}
+		return value;
+	}
+
+	public boolean isPresent() {
+		return value != null;
+	}
+
+	public void ifPresent(Consumer<? super T> consumer) {
+		if (value != null) {
+			consumer.accept(value);
+		}
+	}
+
+	public static <T extends Serializable> SerializableOptional<T> of(@Nonnull T value) {
+		return new SerializableOptional<>(value);
+	}
+
+	@SuppressWarnings("unchecked")
+	public static <T extends Serializable> SerializableOptional<T> empty() {
+		return (SerializableOptional<T>) EMPTY;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 736984e..21e06af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -103,6 +103,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
@@ -833,13 +834,25 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			final Exception cause) {
 
 		if (registeredTaskManagers.containsKey(taskManagerId)) {
-			slotPoolGateway.failAllocation(allocationId, cause);
+			internalFailAllocation(allocationId, cause);
 		} else {
 			log.warn("Cannot fail slot " + allocationId + " because the TaskManager " +
 			taskManagerId + " is unknown.");
 		}
 	}
 
+	private void internalFailAllocation(AllocationID allocationId, Exception cause) {
+		final CompletableFuture<SerializableOptional<ResourceID>> emptyTaskExecutorFuture = slotPoolGateway.failAllocation(allocationId, cause);
+
+		emptyTaskExecutorFuture.thenAcceptAsync(
+			resourceIdOptional -> resourceIdOptional.ifPresent(this::releaseEmptyTaskManager),
+			getMainThreadExecutor());
+	}
+
+	private CompletableFuture<Acknowledge> releaseEmptyTaskManager(ResourceID resourceId) {
+		return disconnectTaskManager(resourceId, new FlinkException(String.format("No more slots registered at JobMaster %s.", resourceId)));
+	}
+
 	@Override
 	public CompletableFuture<RegistrationResponse> registerTaskManager(
 			final String taskManagerRpcAddress,
@@ -982,7 +995,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	@Override
 	public void notifyAllocationFailure(AllocationID allocationID, Exception cause) {
-		slotPoolGateway.failAllocation(allocationID, cause);
+		internalFailAllocation(allocationID, cause);
 	}
 
 	//----------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 13f0462..b53ee93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.clock.Clock;
 import org.apache.flink.runtime.util.clock.SystemClock;
+import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -1001,32 +1002,50 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	 * and decided to take it back.
 	 *
 	 * @param allocationID Represents the allocation which should be failed
-	 * @param cause        The cause of the failure
+	 * @param cause The cause of the failure
+	 * @return Optional task executor if it has no more slots registered
 	 */
 	@Override
-	public void failAllocation(final AllocationID allocationID, final Exception cause) {
+	public CompletableFuture<SerializableOptional<ResourceID>> failAllocation(final AllocationID allocationID, final Exception cause) {
 		final PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
 		if (pendingRequest != null) {
 			// request was still pending
 			failPendingRequest(pendingRequest, cause);
-		}
-		else if (availableSlots.tryRemove(allocationID)) {
-			log.debug("Failed available slot [{}].", allocationID, cause);
+			return CompletableFuture.completedFuture(SerializableOptional.empty());
 		}
 		else {
-			AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID);
-			if (allocatedSlot != null) {
-				// release the slot.
-				// since it is not in 'allocatedSlots' any more, it will be dropped o return'
-				allocatedSlot.releasePayload(cause);
-			}
-			else {
-				log.trace("Outdated request to fail slot [{}].", allocationID, cause);
-			}
+			return tryFailingAllocatedSlot(allocationID, cause);
 		}
+
 		// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
 	}
 
+	private CompletableFuture<SerializableOptional<ResourceID>> tryFailingAllocatedSlot(AllocationID allocationID, Exception cause) {
+		AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID);
+
+		if (allocatedSlot == null) {
+			allocatedSlot = allocatedSlots.remove(allocationID);
+		}
+
+		if (allocatedSlot != null) {
+			log.debug("Failed allocated slot [{}]: {}", allocationID, cause.getMessage());
+
+			// notify TaskExecutor about the failure
+			allocatedSlot.getTaskManagerGateway().freeSlot(allocationID, cause, rpcTimeout);
+			// release the slot.
+			// since it is not in 'allocatedSlots' any more, it will be dropped o return'
+			allocatedSlot.releasePayload(cause);
+
+			final ResourceID taskManagerId = allocatedSlot.getTaskManagerId();
+
+			if (!availableSlots.containsTaskManager(taskManagerId) && !allocatedSlots.containResource(taskManagerId)) {
+				return CompletableFuture.completedFuture(SerializableOptional.of(taskManagerId));
+			}
+		}
+
+		return CompletableFuture.completedFuture(SerializableOptional.empty());
+	}
+
 	// ------------------------------------------------------------------------
 	//  Resource
 	// ------------------------------------------------------------------------
@@ -1107,7 +1126,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 
 		for (AllocatedSlot expiredSlot : expiredSlots) {
 			final AllocationID allocationID = expiredSlot.getAllocationId();
-			if (availableSlots.tryRemove(allocationID)) {
+			if (availableSlots.tryRemove(allocationID) != null) {
 
 				log.info("Releasing idle slot [{}].", allocationID);
 				final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
@@ -1502,7 +1521,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 			}
 		}
 
-		boolean tryRemove(AllocationID slotId) {
+		AllocatedSlot tryRemove(AllocationID slotId) {
 			final SlotAndTimestamp sat = availableSlots.remove(slotId);
 			if (sat != null) {
 				final AllocatedSlot slot = sat.slot();
@@ -1522,15 +1541,15 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 					availableSlotsByHost.remove(host);
 				}
 
-				return true;
+				return slot;
 			}
 			else {
-				return false;
+				return null;
 			}
 		}
 
 		private void remove(AllocationID slotId) throws IllegalStateException {
-			if (!tryRemove(slotId)) {
+			if (tryRemove(slotId) == null) {
 				throw new IllegalStateException("slot not contained");
 			}
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
index 34d9c7f..3e546ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.types.SerializableOptional;
 
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
@@ -126,8 +127,9 @@ public interface SlotPoolGateway extends AllocatedSlotActions, RpcGateway {
 	 *
 	 * @param allocationID identifying the slot which is being failed
 	 * @param cause of the failure
+	 * @return An optional task executor id if this task executor has no more slots registered
 	 */
-	void failAllocation(AllocationID allocationID, Exception cause);
+	CompletableFuture<SerializableOptional<ResourceID>> failAllocation(AllocationID allocationID, Exception cause);
 
 	// ------------------------------------------------------------------------
 	//  allocating and disposing slots
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 5c62a73..e53b480 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -34,9 +34,12 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTrace;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 
+import javax.annotation.Nonnull;
+
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 
@@ -54,6 +57,9 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 
 	private volatile BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> freeSlotFunction;
 
+	@Nonnull
+	private volatile BiConsumer<InstanceID, Exception> disconnectFromJobManagerConsumer = (ignoredA, ignoredB) -> {};
+
 	public SimpleAckingTaskManagerGateway() {
 		optSubmitConsumer = Optional.empty();
 		optCancelConsumer = Optional.empty();
@@ -71,13 +77,19 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 		this.freeSlotFunction = freeSlotFunction;
 	}
 
+	public void setDisconnectFromJobManagerConsumer(@Nonnull BiConsumer<InstanceID, Exception> disconnectFromJobManagerConsumer) {
+		this.disconnectFromJobManagerConsumer = disconnectFromJobManagerConsumer;
+	}
+
 	@Override
 	public String getAddress() {
 		return address;
 	}
 
 	@Override
-	public void disconnectFromJobManager(InstanceID instanceId, Exception cause) {}
+	public void disconnectFromJobManager(InstanceID instanceId, Exception cause) {
+		disconnectFromJobManagerConsumer.accept(instanceId, cause);
+	}
 
 	@Override
 	public void stopCluster(ApplicationStatus applicationStatus, String message) {}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 9a2bc97..462b1d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -550,20 +550,6 @@ public class JobMasterTest extends TestLogger {
 		}
 	}
 
-	private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException {
-		final JobVertex jobVertex = new JobVertex("Test vertex");
-		jobVertex.setInvokableClass(NoOpInvokable.class);
-
-		final ExecutionConfig executionConfig = new ExecutionConfig();
-		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
-
-		final JobGraph jobGraph = new JobGraph(jobVertex);
-		jobGraph.setAllowQueuedScheduling(true);
-		jobGraph.setExecutionConfig(executionConfig);
-
-		return jobGraph;
-	}
-
 	/**
 	 * Tests that we can close an unestablished ResourceManager connection.
 	 */
@@ -975,6 +961,72 @@ public class JobMasterTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that the TaskExecutor is released if all of its slots have been freed.
+	 */
+	@Test
+	public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception {
+		final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
+
+		final JobGraph jobGraph = createSingleVertexJobWithRestartStrategy();
+
+		final JobMaster jobMaster = createJobMaster(
+			configuration,
+			jobGraph,
+			haServices,
+			jobManagerSharedServices,
+			heartbeatServices);
+
+		final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
+		rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
+		rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
+
+		final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
+
+		testingResourceManagerGateway.setRequestSlotConsumer(
+			slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+		final CompletableFuture<JobID> disconnectTaskExecutorFuture = new CompletableFuture<>();
+		final CompletableFuture<AllocationID> freedSlotFuture = new CompletableFuture<>();
+		final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+			.setFreeSlotFunction(
+				(allocationID, throwable) -> {
+					freedSlotFuture.complete(allocationID);
+					return CompletableFuture.completedFuture(Acknowledge.get());
+				})
+			.setDisconnectJobManagerConsumer((jobID, throwable) -> disconnectTaskExecutorFuture.complete(jobID))
+			.createTestingTaskExecutorGateway();
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+		rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), testingTaskExecutorGateway);
+
+		try {
+			jobMaster.start(jobMasterId, testingTimeout).get();
+
+			final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+
+			final AllocationID allocationId = allocationIdFuture.get();
+
+			jobMasterGateway.registerTaskManager(testingTaskExecutorGateway.getAddress(), taskManagerLocation, testingTimeout).get();
+
+			final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
+			final CompletableFuture<Collection<SlotOffer>> acceptedSlotOffers = jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout);
+
+			final Collection<SlotOffer> slotOffers = acceptedSlotOffers.get();
+
+			// check that we accepted the offered slot
+			assertThat(slotOffers, hasSize(1));
+
+			// now fail the allocation and check that we close the connection to the TaskExecutor
+			jobMasterGateway.notifyAllocationFailure(allocationId, new FlinkException("Fail alloction test exception"));
+
+			// we should free the slot and then disconnect from the TaskExecutor because we use no longer slots from it
+			assertThat(freedSlotFuture.get(), equalTo(allocationId));
+			assertThat(disconnectTaskExecutorFuture.get(), equalTo(jobGraph.getJobID()));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+		}
+	}
+
 	private JobGraph producerConsumerJobGraph() {
 		final JobVertex producer = new JobVertex("Producer");
 		producer.setInvokableClass(NoOpInvokable.class);
@@ -1064,6 +1116,20 @@ public class JobMasterTest extends TestLogger {
 			JobMasterTest.class.getClassLoader());
 	}
 
+	private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException {
+		final JobVertex jobVertex = new JobVertex("Test vertex");
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+
+		final ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
+
+		final JobGraph jobGraph = new JobGraph(jobVertex);
+		jobGraph.setAllowQueuedScheduling(true);
+		jobGraph.setExecutionConfig(executionConfig);
+
+		return jobGraph;
+	}
+
 	/**
 	 * No op implementation of {@link OnCompletionActions}.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index 9815cb2..3a9925c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -44,6 +45,7 @@ import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.clock.ManualClock;
+import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -59,7 +61,9 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -69,6 +73,8 @@ import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -692,12 +698,7 @@ public class SlotPoolTest extends TestLogger {
 
 			slotPool.triggerCheckIdleSlot();
 
-			CompletableFuture<LogicalSlot> allocatedSlotFuture = slotPoolGateway.allocateSlot(
-				new SlotRequestId(),
-				new DummyScheduledUnit(),
-				SlotProfile.noRequirements(),
-				true,
-				timeout);
+			CompletableFuture<LogicalSlot> allocatedSlotFuture = allocateSlot(slotPoolGateway, new SlotRequestId());
 
 			// wait until the slot has been fulfilled with the previously idling slot
 			final LogicalSlot logicalSlot = allocatedSlotFuture.get();
@@ -712,12 +713,7 @@ public class SlotPoolTest extends TestLogger {
 			slotPool.triggerCheckIdleSlot();
 
 			// request a new slot after the idling slot has been released
-			allocatedSlotFuture = slotPoolGateway.allocateSlot(
-				new SlotRequestId(),
-				new DummyScheduledUnit(),
-				SlotProfile.noRequirements(),
-				true,
-				timeout);
+			allocatedSlotFuture = allocateSlot(slotPoolGateway, new SlotRequestId());
 
 			// release the TaskExecutor before we get a response from the slot releasing
 			slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID(), null).get();
@@ -739,6 +735,114 @@ public class SlotPoolTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that failed slots are freed on the {@link TaskExecutor}.
+	 */
+	@Test
+	public void testFreeFailedSlots() throws Exception {
+		final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance());
+
+		try {
+			final int parallelism = 5;
+			final ArrayBlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(parallelism);
+			resourceManagerGateway.setRequestSlotConsumer(
+				slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));
+
+			final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
+
+			final Map<SlotRequestId, CompletableFuture<LogicalSlot>> slotRequestFutures = new HashMap<>(parallelism);
+
+			for (int i = 0; i < parallelism; i++) {
+				final SlotRequestId slotRequestId = new SlotRequestId();
+				slotRequestFutures.put(slotRequestId, allocateSlot(slotPoolGateway, slotRequestId));
+			}
+
+			final List<SlotOffer> slotOffers = new ArrayList<>(parallelism);
+
+			for (int i = 0; i < parallelism; i++) {
+				slotOffers.add(new SlotOffer(allocationIds.take(), i, ResourceProfile.UNKNOWN));
+			}
+
+			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
+			slotPoolGateway.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);
+
+			// wait for the completion of both slot futures
+			FutureUtils.waitForAll(slotRequestFutures.values()).get();
+
+			final ArrayBlockingQueue<AllocationID> freedSlots = new ArrayBlockingQueue<>(1);
+			taskManagerGateway.setFreeSlotFunction(
+				(allocationID, throwable) -> {
+					freedSlots.offer(allocationID);
+					return CompletableFuture.completedFuture(Acknowledge.get());
+				});
+
+			final FlinkException failException = new FlinkException("Test fail exception");
+			// fail allocations one by one
+			for (int i = 0; i < parallelism - 1; i++) {
+				final SlotOffer slotOffer = slotOffers.get(i);
+				final CompletableFuture<SerializableOptional<ResourceID>> emptyTaskExecutorFuture = slotPoolGateway.failAllocation(
+					slotOffer.getAllocationId(),
+					failException);
+
+				assertThat(emptyTaskExecutorFuture.get().isPresent(), is(false));
+				assertThat(freedSlots.take(), is(equalTo(slotOffer.getAllocationId())));
+			}
+
+			final SlotOffer slotOffer = slotOffers.get(parallelism - 1);
+			final CompletableFuture<SerializableOptional<ResourceID>> emptyTaskExecutorFuture = slotPoolGateway.failAllocation(
+				slotOffer.getAllocationId(),
+				failException);
+			assertThat(emptyTaskExecutorFuture.get().get(), is(equalTo(taskManagerLocation.getResourceID())));
+			assertThat(freedSlots.take(), is(equalTo(slotOffer.getAllocationId())));
+
+		} finally {
+			RpcUtils.terminateRpcEndpoint(slotPool, timeout);
+		}
+	}
+
+	/**
+	 * Tests that failing an allocation fails the pending slot request
+	 */
+	@Test
+	public void testFailingAllocationFailsPendingSlotRequests() throws Exception {
+		final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance());
+
+		try {
+			final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
+			resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
+			final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
+
+			final CompletableFuture<LogicalSlot> slotFuture = allocateSlot(slotPoolGateway, new SlotRequestId());
+
+			final AllocationID allocationId = allocationIdFuture.get();
+
+			assertThat(slotFuture.isDone(), is(false));
+
+			final FlinkException cause = new FlinkException("Fail pending slot request failure.");
+			final CompletableFuture<SerializableOptional<ResourceID>> responseFuture = slotPoolGateway.failAllocation(allocationId, cause);
+
+			assertThat(responseFuture.get().isPresent(), is(false));
+
+			try {
+				slotFuture.get();
+				fail("Expected a slot allocation failure.");
+			} catch (ExecutionException ee) {
+				assertThat(ExceptionUtils.stripExecutionException(ee), equalTo(cause));
+			}
+		} finally {
+			RpcUtils.terminateRpcEndpoint(slotPool, timeout);
+		}
+	}
+
+	private CompletableFuture<LogicalSlot> allocateSlot(SlotPoolGateway slotPoolGateway, SlotRequestId slotRequestId) {
+		return slotPoolGateway.allocateSlot(
+			slotRequestId,
+			new DummyScheduledUnit(),
+			SlotProfile.noRequirements(),
+			true,
+			timeout);
+	}
+
 	private static SlotPoolGateway setupSlotPool(
 			SlotPool slotPool,
 			ResourceManagerGateway resourceManagerGateway) throws Exception {
@@ -750,13 +854,4 @@ public class SlotPoolTest extends TestLogger {
 
 		return slotPool.getSelfGateway(SlotPoolGateway.class);
 	}
-
-	private AllocatedSlot createSlot(final AllocationID allocationId) {
-		return new AllocatedSlot(
-			allocationId,
-			taskManagerLocation,
-			0,
-			ResourceProfile.UNKNOWN,
-			taskManagerGateway);
-	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index a9e9949..912de36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -59,13 +59,16 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	private final Function<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>, CompletableFuture<Acknowledge>> requestSlotFunction;
 
-	TestingTaskExecutorGateway(String address, String hostname, Consumer<ResourceID> heartbeatJobManagerConsumer, BiConsumer<JobID, Throwable> disconnectJobManagerConsumer, BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer, Function<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>, CompletableFuture<Acknowledge>> requestSlotFunction) {
+	private final BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> freeSlotFunction;
+
+	TestingTaskExecutorGateway(String address, String hostname, Consumer<ResourceID> heartbeatJobManagerConsumer, BiConsumer<JobID, Throwable> disconnectJobManagerConsumer, BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer, Function<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>, CompletableFuture<Acknowledge>> requestSlotFunction, BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> freeSlotFunction) {
 		this.address = Preconditions.checkNotNull(address);
 		this.hostname = Preconditions.checkNotNull(hostname);
 		this.heartbeatJobManagerConsumer = Preconditions.checkNotNull(heartbeatJobManagerConsumer);
 		this.disconnectJobManagerConsumer = Preconditions.checkNotNull(disconnectJobManagerConsumer);
 		this.submitTaskConsumer = Preconditions.checkNotNull(submitTaskConsumer);
 		this.requestSlotFunction = Preconditions.checkNotNull(requestSlotFunction);
+		this.freeSlotFunction = Preconditions.checkNotNull(freeSlotFunction);
 	}
 
 	@Override
@@ -141,7 +144,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	@Override
 	public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
-		return CompletableFuture.completedFuture(Acknowledge.get());
+		return freeSlotFunction.apply(allocationId, cause);
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
index 1c2f132..e59eefd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
@@ -43,6 +43,7 @@ public class TestingTaskExecutorGatewayBuilder {
 	private static final BiConsumer<JobID, Throwable> NOOP_DISCONNECT_JOBMANAGER_CONSUMER = (ignoredA, ignoredB) -> {};
 	private static final BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> NOOP_SUBMIT_TASK_CONSUMER = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(Acknowledge.get());
 	private static final Function<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>, CompletableFuture<Acknowledge>> NOOP_REQUEST_SLOT_FUNCTION = ignored -> CompletableFuture.completedFuture(Acknowledge.get());
+	private static final BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> NOOP_FREE_SLOT_FUNCTION = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(Acknowledge.get());
 
 	private String address = "foobar:1234";
 	private String hostname = "foobar";
@@ -50,6 +51,7 @@ public class TestingTaskExecutorGatewayBuilder {
 	private BiConsumer<JobID, Throwable> disconnectJobManagerConsumer = NOOP_DISCONNECT_JOBMANAGER_CONSUMER;
 	private BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer = NOOP_SUBMIT_TASK_CONSUMER;
 	private Function<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>, CompletableFuture<Acknowledge>> requestSlotFunction = NOOP_REQUEST_SLOT_FUNCTION;
+	private BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> freeSlotFunction = NOOP_FREE_SLOT_FUNCTION;
 
 	public TestingTaskExecutorGatewayBuilder setAddress(String address) {
 		this.address = address;
@@ -81,7 +83,12 @@ public class TestingTaskExecutorGatewayBuilder {
 		return this;
 	}
 
+	public TestingTaskExecutorGatewayBuilder setFreeSlotFunction(BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> freeSlotFunction) {
+		this.freeSlotFunction = freeSlotFunction;
+		return this;
+	}
+
 	public TestingTaskExecutorGateway createTestingTaskExecutorGateway() {
-		return new TestingTaskExecutorGateway(address, hostname, heartbeatJobManagerConsumer, disconnectJobManagerConsumer, submitTaskConsumer, requestSlotFunction);
+		return new TestingTaskExecutorGateway(address, hostname, heartbeatJobManagerConsumer, disconnectJobManagerConsumer, submitTaskConsumer, requestSlotFunction, freeSlotFunction);
 	}
 }