You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/17 13:54:18 UTC

[GitHub] asfgit closed pull request #6394: [FLINK-9912][JM] Release TaskExecutors if they have no slots registered at SlotPool

asfgit closed pull request #6394: [FLINK-9912][JM] Release TaskExecutors if they have no slots registered at SlotPool
URL: https://github.com/apache/flink/pull/6394
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
new file mode 100644
index 00000000000..4ec75c51a84
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/**
+ * Serializable {@link Optional}.
+ */
+public final class SerializableOptional<T extends Serializable> implements Serializable {
+	private static final long serialVersionUID = -3312769593551775940L;
+
+	private static final SerializableOptional<?> EMPTY = new SerializableOptional<>(null);
+
+	@Nullable
+	private final T value;
+
+	private SerializableOptional(@Nullable T value) {
+		this.value = value;
+	}
+
+	public T get() {
+		if (value == null) {
+			throw new NoSuchElementException("No value present");
+		}
+		return value;
+	}
+
+	public boolean isPresent() {
+		return value != null;
+	}
+
+	public void ifPresent(Consumer<? super T> consumer) {
+		if (value != null) {
+			consumer.accept(value);
+		}
+	}
+
+	public static <T extends Serializable> SerializableOptional<T> of(@Nonnull T value) {
+		return new SerializableOptional<>(value);
+	}
+
+	@SuppressWarnings("unchecked")
+	public static <T extends Serializable> SerializableOptional<T> empty() {
+		return (SerializableOptional<T>) EMPTY;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 736984e88e7..21e06af30d6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -103,6 +103,7 @@
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
@@ -833,13 +834,25 @@ public void failSlot(
 			final Exception cause) {
 
 		if (registeredTaskManagers.containsKey(taskManagerId)) {
-			slotPoolGateway.failAllocation(allocationId, cause);
+			internalFailAllocation(allocationId, cause);
 		} else {
 			log.warn("Cannot fail slot " + allocationId + " because the TaskManager " +
 			taskManagerId + " is unknown.");
 		}
 	}
 
+	private void internalFailAllocation(AllocationID allocationId, Exception cause) {
+		final CompletableFuture<SerializableOptional<ResourceID>> emptyTaskExecutorFuture = slotPoolGateway.failAllocation(allocationId, cause);
+
+		emptyTaskExecutorFuture.thenAcceptAsync(
+			resourceIdOptional -> resourceIdOptional.ifPresent(this::releaseEmptyTaskManager),
+			getMainThreadExecutor());
+	}
+
+	private CompletableFuture<Acknowledge> releaseEmptyTaskManager(ResourceID resourceId) {
+		return disconnectTaskManager(resourceId, new FlinkException(String.format("No more slots registered at JobMaster %s.", resourceId)));
+	}
+
 	@Override
 	public CompletableFuture<RegistrationResponse> registerTaskManager(
 			final String taskManagerRpcAddress,
@@ -982,7 +995,7 @@ private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoor
 
 	@Override
 	public void notifyAllocationFailure(AllocationID allocationID, Exception cause) {
-		slotPoolGateway.failAllocation(allocationID, cause);
+		internalFailAllocation(allocationID, cause);
 	}
 
 	//----------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 13f0462455c..b53ee93e643 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -50,6 +50,7 @@
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.clock.Clock;
 import org.apache.flink.runtime.util.clock.SystemClock;
+import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -1001,32 +1002,50 @@ private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) {
 	 * and decided to take it back.
 	 *
 	 * @param allocationID Represents the allocation which should be failed
-	 * @param cause        The cause of the failure
+	 * @param cause The cause of the failure
+	 * @return Optional task executor if it has no more slots registered
 	 */
 	@Override
-	public void failAllocation(final AllocationID allocationID, final Exception cause) {
+	public CompletableFuture<SerializableOptional<ResourceID>> failAllocation(final AllocationID allocationID, final Exception cause) {
 		final PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
 		if (pendingRequest != null) {
 			// request was still pending
 			failPendingRequest(pendingRequest, cause);
-		}
-		else if (availableSlots.tryRemove(allocationID)) {
-			log.debug("Failed available slot [{}].", allocationID, cause);
+			return CompletableFuture.completedFuture(SerializableOptional.empty());
 		}
 		else {
-			AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID);
-			if (allocatedSlot != null) {
-				// release the slot.
-				// since it is not in 'allocatedSlots' any more, it will be dropped o return'
-				allocatedSlot.releasePayload(cause);
-			}
-			else {
-				log.trace("Outdated request to fail slot [{}].", allocationID, cause);
-			}
+			return tryFailingAllocatedSlot(allocationID, cause);
 		}
+
 		// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
 	}
 
+	private CompletableFuture<SerializableOptional<ResourceID>> tryFailingAllocatedSlot(AllocationID allocationID, Exception cause) {
+		AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID);
+
+		if (allocatedSlot == null) {
+			allocatedSlot = allocatedSlots.remove(allocationID);
+		}
+
+		if (allocatedSlot != null) {
+			log.debug("Failed allocated slot [{}]: {}", allocationID, cause.getMessage());
+
+			// notify TaskExecutor about the failure
+			allocatedSlot.getTaskManagerGateway().freeSlot(allocationID, cause, rpcTimeout);
+			// release the slot.
+			// since it is not in 'allocatedSlots' any more, it will be dropped o return'
+			allocatedSlot.releasePayload(cause);
+
+			final ResourceID taskManagerId = allocatedSlot.getTaskManagerId();
+
+			if (!availableSlots.containsTaskManager(taskManagerId) && !allocatedSlots.containResource(taskManagerId)) {
+				return CompletableFuture.completedFuture(SerializableOptional.of(taskManagerId));
+			}
+		}
+
+		return CompletableFuture.completedFuture(SerializableOptional.empty());
+	}
+
 	// ------------------------------------------------------------------------
 	//  Resource
 	// ------------------------------------------------------------------------
@@ -1107,7 +1126,7 @@ private void checkIdleSlot() {
 
 		for (AllocatedSlot expiredSlot : expiredSlots) {
 			final AllocationID allocationID = expiredSlot.getAllocationId();
-			if (availableSlots.tryRemove(allocationID)) {
+			if (availableSlots.tryRemove(allocationID) != null) {
 
 				log.info("Releasing idle slot [{}].", allocationID);
 				final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
@@ -1502,7 +1521,7 @@ SlotAndLocality poll(SchedulingStrategy schedulingStrategy, SlotProfile slotProf
 			}
 		}
 
-		boolean tryRemove(AllocationID slotId) {
+		AllocatedSlot tryRemove(AllocationID slotId) {
 			final SlotAndTimestamp sat = availableSlots.remove(slotId);
 			if (sat != null) {
 				final AllocatedSlot slot = sat.slot();
@@ -1522,15 +1541,15 @@ boolean tryRemove(AllocationID slotId) {
 					availableSlotsByHost.remove(host);
 				}
 
-				return true;
+				return slot;
 			}
 			else {
-				return false;
+				return null;
 			}
 		}
 
 		private void remove(AllocationID slotId) throws IllegalStateException {
-			if (!tryRemove(slotId)) {
+			if (tryRemove(slotId) == null) {
 				throw new IllegalStateException("slot not contained");
 			}
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
index 34d9c7ff601..3e546ff3674 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
@@ -34,6 +34,7 @@
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.types.SerializableOptional;
 
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
@@ -126,8 +127,9 @@
 	 *
 	 * @param allocationID identifying the slot which is being failed
 	 * @param cause of the failure
+	 * @return An optional task executor id if this task executor has no more slots registered
 	 */
-	void failAllocation(AllocationID allocationID, Exception cause);
+	CompletableFuture<SerializableOptional<ResourceID>> failAllocation(AllocationID allocationID, Exception cause);
 
 	// ------------------------------------------------------------------------
 	//  allocating and disposing slots
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index afcd24f1064..ef288a26469 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -36,7 +36,6 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
 
 import java.util.AbstractCollection;
 import java.util.Collection;
@@ -82,9 +81,6 @@
 
 	private static final Logger LOG = LoggerFactory.getLogger(SlotSharingManager.class);
 
-	/** Lock for the internal data structures. */
-	private final Object lock = new Object();
-
 	private final SlotSharingGroupId slotSharingGroupId;
 
 	/** Actions to release allocated slots after a complete multi task slot hierarchy has been released. */
@@ -96,11 +92,9 @@
 	private final Map<SlotRequestId, TaskSlot> allTaskSlots;
 
 	/** Root nodes which have not been completed because the allocated slot is still pending. */
-	@GuardedBy("lock")
 	private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
 
 	/** Root nodes which have been completed (the underlying allocated slot has been assigned). */
-	@GuardedBy("lock")
 	private final Map<TaskManagerLocation, Set<MultiTaskSlot>> resolvedRootSlots;
 
 	SlotSharingManager(
@@ -152,27 +146,23 @@ MultiTaskSlot createRootSlot(
 
 		allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
 
-		synchronized (lock) {
-			unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
-		}
+		unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
 
 		// add the root node to the set of resolved root nodes once the SlotContext future has
 		// been completed and we know the slot's TaskManagerLocation
 		slotContextFuture.whenComplete(
 			(SlotContext slotContext, Throwable throwable) -> {
 				if (slotContext != null) {
-					synchronized (lock) {
-						final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
+					final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
 
-						if (resolvedRootNode != null) {
-							LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
+					if (resolvedRootNode != null) {
+						LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
 
-							final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
-								slotContext.getTaskManagerLocation(),
-								taskManagerLocation -> new HashSet<>(4));
+						final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
+							slotContext.getTaskManagerLocation(),
+							taskManagerLocation -> new HashSet<>(4));
 
-							innerCollection.add(resolvedRootNode);
-						}
+						innerCollection.add(resolvedRootNode);
 					}
 				} else {
 					rootMultiTaskSlot.release(throwable);
@@ -193,15 +183,13 @@ MultiTaskSlot createRootSlot(
 	 */
 	@Nullable
 	MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, SchedulingStrategy matcher, SlotProfile slotProfile) {
-		synchronized (lock) {
-			Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = this.resolvedRootSlots.values();
-			return matcher.findMatchWithLocality(
-				slotProfile,
-				resolvedRootSlotsValues.stream().flatMap(Collection::stream),
-				(MultiTaskSlot multiTaskSlot) -> multiTaskSlot.getSlotContextFuture().join(),
-				(MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId),
-				MultiTaskSlotLocality::of);
-		}
+		Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = this.resolvedRootSlots.values();
+		return matcher.findMatchWithLocality(
+			slotProfile,
+			resolvedRootSlotsValues.stream().flatMap(Collection::stream),
+			(MultiTaskSlot multiTaskSlot) -> multiTaskSlot.getSlotContextFuture().join(),
+			(MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId),
+			MultiTaskSlotLocality::of);
 	}
 
 	/**
@@ -213,11 +201,9 @@ MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, SchedulingStrategy
 	 */
 	@Nullable
 	MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
-		synchronized (lock) {
-			for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
-				if (!multiTaskSlot.contains(groupId)) {
-					return multiTaskSlot;
-				}
+		for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
+			if (!multiTaskSlot.contains(groupId)) {
+				return multiTaskSlot;
 			}
 		}
 
@@ -228,11 +214,9 @@ MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
 	public String toString() {
 		final StringBuilder builder = new StringBuilder("{\n\tgroupId=").append(slotSharingGroupId).append('\n');
 
-		synchronized (lock) {
-			builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
-			builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
-			builder.append("\tall=").append(allTaskSlots).append('\n');
-		}
+		builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
+		builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
+		builder.append("\tall=").append(allTaskSlots).append('\n');
 
 		return builder.append('}').toString();
 	}
@@ -479,26 +463,20 @@ public void release(Throwable cause) {
 				parent.releaseChild(getGroupId());
 			} else if (allTaskSlots.remove(getSlotRequestId()) != null) {
 				// we are the root node --> remove the root node from the list of task slots
+				final MultiTaskSlot unresolvedRootSlot = unresolvedRootSlots.remove(getSlotRequestId());
 
-				if (!slotContextFuture.isDone() || slotContextFuture.isCompletedExceptionally()) {
-					synchronized (lock) {
-						// the root node should still be unresolved
-						unresolvedRootSlots.remove(getSlotRequestId());
-					}
-				} else {
+				if (unresolvedRootSlot == null) {
 					// the root node should be resolved --> we can access the slot context
 					final SlotContext slotContext = slotContextFuture.getNow(null);
 
 					if (slotContext != null) {
-						synchronized (lock) {
-							final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
+						final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
 
-							if (multiTaskSlots != null) {
-								multiTaskSlots.remove(this);
+						if (multiTaskSlots != null) {
+							multiTaskSlots.remove(this);
 
-								if (multiTaskSlots.isEmpty()) {
-									resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
-								}
+							if (multiTaskSlots.isEmpty()) {
+								resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
 							}
 						}
 					}
@@ -637,9 +615,7 @@ public String toString() {
 
 	@VisibleForTesting
 	Collection<MultiTaskSlot> getUnresolvedRootSlots() {
-		synchronized (lock) {
-			return unresolvedRootSlots.values();
-		}
+		return unresolvedRootSlots.values();
 	}
 
 	/**
@@ -649,19 +625,15 @@ public String toString() {
 
 		@Override
 		public Iterator<MultiTaskSlot> iterator() {
-			synchronized (lock) {
-				return new ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
-			}
+			return new ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
 		}
 
 		@Override
 		public int size() {
 			int numberResolvedMultiTaskSlots = 0;
 
-			synchronized (lock) {
-				for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) {
-					numberResolvedMultiTaskSlots += multiTaskSlots.size();
-				}
+			for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) {
+				numberResolvedMultiTaskSlots += multiTaskSlots.size();
 			}
 
 			return numberResolvedMultiTaskSlots;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 5c62a737096..e53b48021ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -34,9 +34,12 @@
 import org.apache.flink.runtime.messages.StackTrace;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 
+import javax.annotation.Nonnull;
+
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 
@@ -54,6 +57,9 @@
 
 	private volatile BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> freeSlotFunction;
 
+	@Nonnull
+	private volatile BiConsumer<InstanceID, Exception> disconnectFromJobManagerConsumer = (ignoredA, ignoredB) -> {};
+
 	public SimpleAckingTaskManagerGateway() {
 		optSubmitConsumer = Optional.empty();
 		optCancelConsumer = Optional.empty();
@@ -71,13 +77,19 @@ public void setFreeSlotFunction(BiFunction<AllocationID, Throwable, CompletableF
 		this.freeSlotFunction = freeSlotFunction;
 	}
 
+	public void setDisconnectFromJobManagerConsumer(@Nonnull BiConsumer<InstanceID, Exception> disconnectFromJobManagerConsumer) {
+		this.disconnectFromJobManagerConsumer = disconnectFromJobManagerConsumer;
+	}
+
 	@Override
 	public String getAddress() {
 		return address;
 	}
 
 	@Override
-	public void disconnectFromJobManager(InstanceID instanceId, Exception cause) {}
+	public void disconnectFromJobManager(InstanceID instanceId, Exception cause) {
+		disconnectFromJobManagerConsumer.accept(instanceId, cause);
+	}
 
 	@Override
 	public void stopCluster(ApplicationStatus applicationStatus, String message) {}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 9a2bc97b62b..462b1d16da9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -550,20 +550,6 @@ public void testSlotRequestTimeoutWhenNoSlotOffering() throws Exception {
 		}
 	}
 
-	private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException {
-		final JobVertex jobVertex = new JobVertex("Test vertex");
-		jobVertex.setInvokableClass(NoOpInvokable.class);
-
-		final ExecutionConfig executionConfig = new ExecutionConfig();
-		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
-
-		final JobGraph jobGraph = new JobGraph(jobVertex);
-		jobGraph.setAllowQueuedScheduling(true);
-		jobGraph.setExecutionConfig(executionConfig);
-
-		return jobGraph;
-	}
-
 	/**
 	 * Tests that we can close an unestablished ResourceManager connection.
 	 */
@@ -975,6 +961,72 @@ public void testTriggerSavepointTimeout() throws Exception {
 		}
 	}
 
+	/**
+	 * Tests that the TaskExecutor is released if all of its slots have been freed.
+	 */
+	@Test
+	public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception {
+		final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
+
+		final JobGraph jobGraph = createSingleVertexJobWithRestartStrategy();
+
+		final JobMaster jobMaster = createJobMaster(
+			configuration,
+			jobGraph,
+			haServices,
+			jobManagerSharedServices,
+			heartbeatServices);
+
+		final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
+		rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
+		rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
+
+		final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
+
+		testingResourceManagerGateway.setRequestSlotConsumer(
+			slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+		final CompletableFuture<JobID> disconnectTaskExecutorFuture = new CompletableFuture<>();
+		final CompletableFuture<AllocationID> freedSlotFuture = new CompletableFuture<>();
+		final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+			.setFreeSlotFunction(
+				(allocationID, throwable) -> {
+					freedSlotFuture.complete(allocationID);
+					return CompletableFuture.completedFuture(Acknowledge.get());
+				})
+			.setDisconnectJobManagerConsumer((jobID, throwable) -> disconnectTaskExecutorFuture.complete(jobID))
+			.createTestingTaskExecutorGateway();
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+		rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), testingTaskExecutorGateway);
+
+		try {
+			jobMaster.start(jobMasterId, testingTimeout).get();
+
+			final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+
+			final AllocationID allocationId = allocationIdFuture.get();
+
+			jobMasterGateway.registerTaskManager(testingTaskExecutorGateway.getAddress(), taskManagerLocation, testingTimeout).get();
+
+			final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
+			final CompletableFuture<Collection<SlotOffer>> acceptedSlotOffers = jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout);
+
+			final Collection<SlotOffer> slotOffers = acceptedSlotOffers.get();
+
+			// check that we accepted the offered slot
+			assertThat(slotOffers, hasSize(1));
+
+			// now fail the allocation and check that we close the connection to the TaskExecutor
+			jobMasterGateway.notifyAllocationFailure(allocationId, new FlinkException("Fail alloction test exception"));
+
+			// we should free the slot and then disconnect from the TaskExecutor because we use no longer slots from it
+			assertThat(freedSlotFuture.get(), equalTo(allocationId));
+			assertThat(disconnectTaskExecutorFuture.get(), equalTo(jobGraph.getJobID()));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+		}
+	}
+
 	private JobGraph producerConsumerJobGraph() {
 		final JobVertex producer = new JobVertex("Producer");
 		producer.setInvokableClass(NoOpInvokable.class);
@@ -1064,6 +1116,20 @@ private JobMaster createJobMaster(
 			JobMasterTest.class.getClassLoader());
 	}
 
+	private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException {
+		final JobVertex jobVertex = new JobVertex("Test vertex");
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+
+		final ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
+
+		final JobGraph jobGraph = new JobGraph(jobVertex);
+		jobGraph.setAllowQueuedScheduling(true);
+		jobGraph.setExecutionConfig(executionConfig);
+
+		return jobGraph;
+	}
+
 	/**
 	 * No op implementation of {@link OnCompletionActions}.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index 9815cb289da..3a9925c4f20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -44,6 +45,7 @@
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.clock.ManualClock;
+import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -59,7 +61,9 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -69,6 +73,8 @@
 
 import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -692,12 +698,7 @@ public void testReleasingIdleSlotFailed() throws Exception {
 
 			slotPool.triggerCheckIdleSlot();
 
-			CompletableFuture<LogicalSlot> allocatedSlotFuture = slotPoolGateway.allocateSlot(
-				new SlotRequestId(),
-				new DummyScheduledUnit(),
-				SlotProfile.noRequirements(),
-				true,
-				timeout);
+			CompletableFuture<LogicalSlot> allocatedSlotFuture = allocateSlot(slotPoolGateway, new SlotRequestId());
 
 			// wait until the slot has been fulfilled with the previously idling slot
 			final LogicalSlot logicalSlot = allocatedSlotFuture.get();
@@ -712,12 +713,7 @@ public void testReleasingIdleSlotFailed() throws Exception {
 			slotPool.triggerCheckIdleSlot();
 
 			// request a new slot after the idling slot has been released
-			allocatedSlotFuture = slotPoolGateway.allocateSlot(
-				new SlotRequestId(),
-				new DummyScheduledUnit(),
-				SlotProfile.noRequirements(),
-				true,
-				timeout);
+			allocatedSlotFuture = allocateSlot(slotPoolGateway, new SlotRequestId());
 
 			// release the TaskExecutor before we get a response from the slot releasing
 			slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID(), null).get();
@@ -739,6 +735,114 @@ public void testReleasingIdleSlotFailed() throws Exception {
 		}
 	}
 
+	/**
+	 * Tests that failed slots are freed on the {@link TaskExecutor}.
+	 */
+	@Test
+	public void testFreeFailedSlots() throws Exception {
+		final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance());
+
+		try {
+			final int parallelism = 5;
+			final ArrayBlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(parallelism);
+			resourceManagerGateway.setRequestSlotConsumer(
+				slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));
+
+			final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
+
+			final Map<SlotRequestId, CompletableFuture<LogicalSlot>> slotRequestFutures = new HashMap<>(parallelism);
+
+			for (int i = 0; i < parallelism; i++) {
+				final SlotRequestId slotRequestId = new SlotRequestId();
+				slotRequestFutures.put(slotRequestId, allocateSlot(slotPoolGateway, slotRequestId));
+			}
+
+			final List<SlotOffer> slotOffers = new ArrayList<>(parallelism);
+
+			for (int i = 0; i < parallelism; i++) {
+				slotOffers.add(new SlotOffer(allocationIds.take(), i, ResourceProfile.UNKNOWN));
+			}
+
+			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
+			slotPoolGateway.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);
+
+			// wait for the completion of both slot futures
+			FutureUtils.waitForAll(slotRequestFutures.values()).get();
+
+			final ArrayBlockingQueue<AllocationID> freedSlots = new ArrayBlockingQueue<>(1);
+			taskManagerGateway.setFreeSlotFunction(
+				(allocationID, throwable) -> {
+					freedSlots.offer(allocationID);
+					return CompletableFuture.completedFuture(Acknowledge.get());
+				});
+
+			final FlinkException failException = new FlinkException("Test fail exception");
+			// fail allocations one by one
+			for (int i = 0; i < parallelism - 1; i++) {
+				final SlotOffer slotOffer = slotOffers.get(i);
+				final CompletableFuture<SerializableOptional<ResourceID>> emptyTaskExecutorFuture = slotPoolGateway.failAllocation(
+					slotOffer.getAllocationId(),
+					failException);
+
+				assertThat(emptyTaskExecutorFuture.get().isPresent(), is(false));
+				assertThat(freedSlots.take(), is(equalTo(slotOffer.getAllocationId())));
+			}
+
+			final SlotOffer slotOffer = slotOffers.get(parallelism - 1);
+			final CompletableFuture<SerializableOptional<ResourceID>> emptyTaskExecutorFuture = slotPoolGateway.failAllocation(
+				slotOffer.getAllocationId(),
+				failException);
+			assertThat(emptyTaskExecutorFuture.get().get(), is(equalTo(taskManagerLocation.getResourceID())));
+			assertThat(freedSlots.take(), is(equalTo(slotOffer.getAllocationId())));
+
+		} finally {
+			RpcUtils.terminateRpcEndpoint(slotPool, timeout);
+		}
+	}
+
+	/**
+	 * Tests that failing an allocation fails the pending slot request
+	 */
+	@Test
+	public void testFailingAllocationFailsPendingSlotRequests() throws Exception {
+		final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance());
+
+		try {
+			final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
+			resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
+			final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
+
+			final CompletableFuture<LogicalSlot> slotFuture = allocateSlot(slotPoolGateway, new SlotRequestId());
+
+			final AllocationID allocationId = allocationIdFuture.get();
+
+			assertThat(slotFuture.isDone(), is(false));
+
+			final FlinkException cause = new FlinkException("Fail pending slot request failure.");
+			final CompletableFuture<SerializableOptional<ResourceID>> responseFuture = slotPoolGateway.failAllocation(allocationId, cause);
+
+			assertThat(responseFuture.get().isPresent(), is(false));
+
+			try {
+				slotFuture.get();
+				fail("Expected a slot allocation failure.");
+			} catch (ExecutionException ee) {
+				assertThat(ExceptionUtils.stripExecutionException(ee), equalTo(cause));
+			}
+		} finally {
+			RpcUtils.terminateRpcEndpoint(slotPool, timeout);
+		}
+	}
+
+	private CompletableFuture<LogicalSlot> allocateSlot(SlotPoolGateway slotPoolGateway, SlotRequestId slotRequestId) {
+		return slotPoolGateway.allocateSlot(
+			slotRequestId,
+			new DummyScheduledUnit(),
+			SlotProfile.noRequirements(),
+			true,
+			timeout);
+	}
+
 	private static SlotPoolGateway setupSlotPool(
 			SlotPool slotPool,
 			ResourceManagerGateway resourceManagerGateway) throws Exception {
@@ -750,13 +854,4 @@ private static SlotPoolGateway setupSlotPool(
 
 		return slotPool.getSelfGateway(SlotPoolGateway.class);
 	}
-
-	private AllocatedSlot createSlot(final AllocationID allocationId) {
-		return new AllocatedSlot(
-			allocationId,
-			taskManagerLocation,
-			0,
-			ResourceProfile.UNKNOWN,
-			taskManagerGateway);
-	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index a9e99495e34..912de36c881 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -59,13 +59,16 @@
 
 	private final Function<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>, CompletableFuture<Acknowledge>> requestSlotFunction;
 
-	TestingTaskExecutorGateway(String address, String hostname, Consumer<ResourceID> heartbeatJobManagerConsumer, BiConsumer<JobID, Throwable> disconnectJobManagerConsumer, BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer, Function<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>, CompletableFuture<Acknowledge>> requestSlotFunction) {
+	private final BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> freeSlotFunction;
+
+	TestingTaskExecutorGateway(String address, String hostname, Consumer<ResourceID> heartbeatJobManagerConsumer, BiConsumer<JobID, Throwable> disconnectJobManagerConsumer, BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer, Function<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>, CompletableFuture<Acknowledge>> requestSlotFunction, BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> freeSlotFunction) {
 		this.address = Preconditions.checkNotNull(address);
 		this.hostname = Preconditions.checkNotNull(hostname);
 		this.heartbeatJobManagerConsumer = Preconditions.checkNotNull(heartbeatJobManagerConsumer);
 		this.disconnectJobManagerConsumer = Preconditions.checkNotNull(disconnectJobManagerConsumer);
 		this.submitTaskConsumer = Preconditions.checkNotNull(submitTaskConsumer);
 		this.requestSlotFunction = Preconditions.checkNotNull(requestSlotFunction);
+		this.freeSlotFunction = Preconditions.checkNotNull(freeSlotFunction);
 	}
 
 	@Override
@@ -141,7 +144,7 @@ public void disconnectResourceManager(Exception cause) {
 
 	@Override
 	public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
-		return CompletableFuture.completedFuture(Acknowledge.get());
+		return freeSlotFunction.apply(allocationId, cause);
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
index 1c2f1328a5a..e59eefd0c8d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
@@ -43,6 +43,7 @@
 	private static final BiConsumer<JobID, Throwable> NOOP_DISCONNECT_JOBMANAGER_CONSUMER = (ignoredA, ignoredB) -> {};
 	private static final BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> NOOP_SUBMIT_TASK_CONSUMER = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(Acknowledge.get());
 	private static final Function<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>, CompletableFuture<Acknowledge>> NOOP_REQUEST_SLOT_FUNCTION = ignored -> CompletableFuture.completedFuture(Acknowledge.get());
+	private static final BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> NOOP_FREE_SLOT_FUNCTION = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(Acknowledge.get());
 
 	private String address = "foobar:1234";
 	private String hostname = "foobar";
@@ -50,6 +51,7 @@
 	private BiConsumer<JobID, Throwable> disconnectJobManagerConsumer = NOOP_DISCONNECT_JOBMANAGER_CONSUMER;
 	private BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer = NOOP_SUBMIT_TASK_CONSUMER;
 	private Function<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>, CompletableFuture<Acknowledge>> requestSlotFunction = NOOP_REQUEST_SLOT_FUNCTION;
+	private BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> freeSlotFunction = NOOP_FREE_SLOT_FUNCTION;
 
 	public TestingTaskExecutorGatewayBuilder setAddress(String address) {
 		this.address = address;
@@ -81,7 +83,12 @@ public TestingTaskExecutorGatewayBuilder setRequestSlotFunction(Function<Tuple5<
 		return this;
 	}
 
+	public TestingTaskExecutorGatewayBuilder setFreeSlotFunction(BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> freeSlotFunction) {
+		this.freeSlotFunction = freeSlotFunction;
+		return this;
+	}
+
 	public TestingTaskExecutorGateway createTestingTaskExecutorGateway() {
-		return new TestingTaskExecutorGateway(address, hostname, heartbeatJobManagerConsumer, disconnectJobManagerConsumer, submitTaskConsumer, requestSlotFunction);
+		return new TestingTaskExecutorGateway(address, hostname, heartbeatJobManagerConsumer, disconnectJobManagerConsumer, submitTaskConsumer, requestSlotFunction, freeSlotFunction);
 	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services