You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/12 19:05:36 UTC

[flink] 09/10: [FLINK-12765][jobmanager] Let some slot reqests fail if the sharing slot is oversubscribed

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a587576ef513f1df5a3c19b0e01a9eb43ec6f80e
Author: Gao Yun <yu...@alibaba-inc.com>
AuthorDate: Sat Jun 22 00:13:56 2019 +0800

    [FLINK-12765][jobmanager] Let some slot reqests fail if the sharing slot is oversubscribed
---
 .../runtime/jobmaster/slotpool/SchedulerImpl.java  |  53 ++++-
 .../SharedSlotOversubscribedException.java         |  40 ++++
 .../jobmaster/slotpool/SlotSharingManager.java     |  89 ++++++++-
 .../jobmaster/slotpool/SlotPoolCoLocationTest.java | 162 +++++++++++++++
 .../slotpool/SlotPoolSlotSharingTest.java          |  92 +++++++++
 .../jobmaster/slotpool/SlotSharingManagerTest.java | 219 +++++++++++++++++++++
 6 files changed, 640 insertions(+), 15 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
index 57c42b8..ef87dd1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
@@ -145,24 +146,53 @@ public class SchedulerImpl implements Scheduler {
 		componentMainThreadExecutor.assertRunningInMainThread();
 
 		final CompletableFuture<LogicalSlot> allocationResultFuture = new CompletableFuture<>();
+		internalAllocateSlot(
+				allocationResultFuture,
+				slotRequestId,
+				scheduledUnit,
+				slotProfile,
+				allowQueuedScheduling,
+				allocationTimeout);
+		return allocationResultFuture;
+	}
 
+	private void internalAllocateSlot(
+			CompletableFuture<LogicalSlot> allocationResultFuture,
+			SlotRequestId slotRequestId,
+			ScheduledUnit scheduledUnit,
+			SlotProfile slotProfile,
+			boolean allowQueuedScheduling,
+			Time allocationTimeout) {
 		CompletableFuture<LogicalSlot> allocationFuture = scheduledUnit.getSlotSharingGroupId() == null ?
 			allocateSingleSlot(slotRequestId, slotProfile, allowQueuedScheduling, allocationTimeout) :
 			allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, allocationTimeout);
 
 		allocationFuture.whenComplete((LogicalSlot slot, Throwable failure) -> {
 			if (failure != null) {
-				cancelSlotRequest(
-					slotRequestId,
-					scheduledUnit.getSlotSharingGroupId(),
-					failure);
-				allocationResultFuture.completeExceptionally(failure);
+				Optional<SharedSlotOversubscribedException> sharedSlotOverAllocatedException =
+						ExceptionUtils.findThrowable(failure, SharedSlotOversubscribedException.class);
+				if (sharedSlotOverAllocatedException.isPresent() &&
+						sharedSlotOverAllocatedException.get().canRetry()) {
+
+					// Retry the allocation
+					internalAllocateSlot(
+							allocationResultFuture,
+							slotRequestId,
+							scheduledUnit,
+							slotProfile,
+							allowQueuedScheduling,
+							allocationTimeout);
+				} else {
+					cancelSlotRequest(
+							slotRequestId,
+							scheduledUnit.getSlotSharingGroupId(),
+							failure);
+					allocationResultFuture.completeExceptionally(failure);
+				}
 			} else {
 				allocationResultFuture.complete(slot);
 			}
 		});
-
-		return allocationResultFuture;
 	}
 
 	@Override
@@ -354,7 +384,14 @@ public class SchedulerImpl implements Scheduler {
 
 			if (taskSlot != null) {
 				Preconditions.checkState(taskSlot instanceof SlotSharingManager.MultiTaskSlot);
-				return SlotSharingManager.MultiTaskSlotLocality.of(((SlotSharingManager.MultiTaskSlot) taskSlot), Locality.LOCAL);
+
+				SlotSharingManager.MultiTaskSlot multiTaskSlot = (SlotSharingManager.MultiTaskSlot) taskSlot;
+
+				if (multiTaskSlot.mayHaveEnoughResourcesToFulfill(slotProfile.getResourceProfile())) {
+					return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
+				}
+
+				throw new NoResourceAvailableException("Not enough resources in the slot for all co-located tasks.");
 			} else {
 				// the slot may have been cancelled in the mean time
 				coLocationConstraint.setSlotRequestId(null);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SharedSlotOversubscribedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SharedSlotOversubscribedException.java
new file mode 100644
index 0000000..8a61612
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SharedSlotOversubscribedException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+/**
+ * If a shared slot is over-allocated before it has been resolved,
+ * some requests will be rejected with this exception to ensure the
+ * total resource requested do not exceed the total resources. The
+ * released requests can be retried if couldRetry is marked.
+ */
+class SharedSlotOversubscribedException extends Exception {
+
+	/** Whether the requester can retry the request. */
+	private final boolean canRetry;
+
+	SharedSlotOversubscribedException(String message, boolean canRetry) {
+		super(message);
+		this.canRetry = canRetry;
+	}
+
+	boolean canRetry() {
+		return canRetry;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index a7afbac..2c81a91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -41,6 +41,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.AbstractCollection;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -48,6 +49,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -356,9 +358,9 @@ public class SlotSharingManager {
 				CompletableFuture<? extends SlotContext> slotContextFuture,
 				@Nullable SlotRequestId allocatedSlotRequestId) {
 			super(slotRequestId, groupId);
+			Preconditions.checkNotNull(slotContextFuture);
 
 			this.parent = parent;
-			this.slotContextFuture = Preconditions.checkNotNull(slotContextFuture);
 			this.allocatedSlotRequestId = allocatedSlotRequestId;
 
 			this.children = new HashMap<>(16);
@@ -366,12 +368,19 @@ public class SlotSharingManager {
 
 			this.reservedResources = ResourceProfile.ZERO;
 
-			slotContextFuture.whenComplete(
-				(SlotContext ignored, Throwable throwable) -> {
-					if (throwable != null) {
-						release(throwable);
-					}
-				});
+			this.slotContextFuture = slotContextFuture.handle((SlotContext slotContext, Throwable throwable) -> {
+				if (throwable != null) {
+					// If the underlying resource request failed, we currently fail all the requests
+					release(throwable);
+					throw new CompletionException(throwable);
+				}
+
+				if (parent == null) {
+					checkOversubscriptionAndReleaseChildren(slotContext);
+				}
+
+				return slotContext;
+			});
 		}
 
 		CompletableFuture<? extends SlotContext> getSlotContextFuture() {
@@ -511,6 +520,30 @@ public class SlotSharingManager {
 		}
 
 		/**
+		 * Checks if the task slot may have enough resource to fulfill the specific
+		 * request. If the underlying slot is not allocated, the check is skipped.
+		 *
+		 * @param resourceProfile The specific request to check.
+		 * @return Whether the slot is possible to fulfill the request in the future.
+		 */
+		boolean mayHaveEnoughResourcesToFulfill(ResourceProfile resourceProfile) {
+			if (!slotContextFuture.isDone()) {
+				return true;
+			}
+
+			MultiTaskSlot root = this;
+
+			while (root.parent != null) {
+				root = root.parent;
+			}
+
+			SlotContext slotContext = root.getSlotContextFuture().join();
+
+			return slotContext.getResourceProfile().isMatching(
+					resourceProfile.merge(root.getReservedResources()));
+		}
+
+		/**
 		 * Releases the child with the given childGroupId.
 		 *
 		 * @param childGroupId identifying the child to release
@@ -548,6 +581,48 @@ public class SlotSharingManager {
 			}
 		}
 
+		private void checkOversubscriptionAndReleaseChildren(SlotContext slotContext) {
+			final ResourceProfile slotResources = slotContext.getResourceProfile();
+			final ArrayList<TaskSlot> childrenToEvict = new ArrayList<>();
+			ResourceProfile requiredResources = ResourceProfile.ZERO;
+
+			for (TaskSlot slot : children.values()) {
+				final ResourceProfile resourcesWithChild = requiredResources.merge(slot.getReservedResources());
+
+				if (slotResources.isMatching(resourcesWithChild)) {
+					requiredResources = resourcesWithChild;
+				} else {
+					childrenToEvict.add(slot);
+				}
+			}
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Not all requests are fulfilled due to over-allocated, number of requests is {}, " +
+						"number of evicted requests is {}, underlying allocated is {}, fulfilled is {}, " +
+						"evicted requests is {},",
+					children.size(),
+					childrenToEvict.size(),
+					slotContext.getResourceProfile(),
+					requiredResources,
+					childrenToEvict);
+			}
+
+			if (childrenToEvict.size() == children.size()) {
+				// Since RM always return a slot whose resource is larger than the requested one,
+				// The current situation only happens when we request to RM using the resource
+				// profile of a task who is belonging to a CoLocationGroup. Similar to dealing
+				// with the failure of the underlying request, currently we fail all the requests
+				// directly.
+				release(new SharedSlotOversubscribedException(
+					"The allocated slot does not have enough resource for any task.", false));
+			} else {
+				for (TaskSlot taskSlot : childrenToEvict) {
+					taskSlot.release(new SharedSlotOversubscribedException(
+						"The allocated slot does not have enough resource for all the tasks.", true));
+				}
+			}
+		}
+
 		@Override
 		public String toString() {
 			String physicalSlotDescription;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
index 44c9977..52cc3af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
@@ -41,6 +41,7 @@ import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -51,6 +52,7 @@ import java.util.concurrent.ExecutionException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test cases for {@link CoLocationConstraint} with the {@link SlotPoolImpl}.
@@ -158,4 +160,164 @@ public class SlotPoolCoLocationTest extends TestLogger {
 		assertEquals(logicalSlot21.getAllocationId(), logicalSlot22.getAllocationId());
 		assertNotEquals(logicalSlot11.getAllocationId(), logicalSlot21.getAllocationId());
 	}
+
+	@Test
+	public void testCoLocatedSlotRequestsFailBeforeResolved() throws ExecutionException, InterruptedException {
+		final ResourceProfile rp1 = new ResourceProfile(1.0, 100);
+		final ResourceProfile rp2 = new ResourceProfile(2.0, 200);
+		final ResourceProfile rp3 = new ResourceProfile(5.0, 500);
+
+		final ResourceProfile allocatedSlotRp = new ResourceProfile(3.0, 300);
+
+		final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(1);
+
+		final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
+
+		testingResourceManagerGateway.setRequestSlotConsumer(
+				(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
+
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+		final SlotPool slotPoolGateway = slotPoolResource.getSlotPool();
+		slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
+
+		CoLocationGroup group = new CoLocationGroup();
+		CoLocationConstraint coLocationConstraint1 = group.getLocationConstraint(0);
+
+		final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+
+		JobVertexID jobVertexId1 = new JobVertexID();
+		JobVertexID jobVertexId2 = new JobVertexID();
+		JobVertexID jobVertexId3 = new JobVertexID();
+
+		final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
+		CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId1,
+						slotSharingGroupId,
+						coLocationConstraint1),
+				true,
+				SlotProfile.noLocality(rp1),
+				TestingUtils.infiniteTime());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId2,
+						slotSharingGroupId,
+						coLocationConstraint1),
+				true,
+				SlotProfile.noLocality(rp2),
+				TestingUtils.infiniteTime());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId3,
+						slotSharingGroupId,
+						coLocationConstraint1),
+				true,
+				SlotProfile.noLocality(rp3),
+				TestingUtils.infiniteTime());
+
+		final AllocationID allocationId1 = allocationIds.take();
+
+		Collection<SlotOffer> slotOfferFuture1 = slotPoolGateway.offerSlots(
+				taskManagerLocation,
+				new SimpleAckingTaskManagerGateway(),
+				Collections.singletonList(new SlotOffer(
+						allocationId1,
+						0,
+						allocatedSlotRp)));
+
+		assertFalse(slotOfferFuture1.isEmpty());
+
+		for (CompletableFuture<LogicalSlot> logicalSlotFuture : Arrays.asList(logicalSlotFuture1, logicalSlotFuture2, logicalSlotFuture3)) {
+			assertTrue(logicalSlotFuture.isDone() && logicalSlotFuture.isCompletedExceptionally());
+			logicalSlotFuture.whenComplete((LogicalSlot ignored, Throwable throwable) -> {
+				assertTrue(throwable instanceof SharedSlotOversubscribedException);
+				assertTrue(((SharedSlotOversubscribedException) throwable).canRetry());
+			});
+		}
+	}
+
+	@Test
+	public void testCoLocatedSlotRequestsFailAfterResolved() throws ExecutionException, InterruptedException {
+		final ResourceProfile rp1 = new ResourceProfile(1.0, 100);
+		final ResourceProfile rp2 = new ResourceProfile(2.0, 200);
+		final ResourceProfile rp3 = new ResourceProfile(5.0, 500);
+
+		final ResourceProfile allocatedSlotRp = new ResourceProfile(3.0, 300);
+
+		final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(1);
+
+		final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
+
+		testingResourceManagerGateway.setRequestSlotConsumer(
+				(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
+
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+		final SlotPool slotPoolGateway = slotPoolResource.getSlotPool();
+		slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
+
+		CoLocationGroup group = new CoLocationGroup();
+		CoLocationConstraint coLocationConstraint1 = group.getLocationConstraint(0);
+
+		final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+
+		JobVertexID jobVertexId1 = new JobVertexID();
+		JobVertexID jobVertexId2 = new JobVertexID();
+		JobVertexID jobVertexId3 = new JobVertexID();
+
+		final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
+		CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId1,
+						slotSharingGroupId,
+						coLocationConstraint1),
+				true,
+				SlotProfile.noLocality(rp1),
+				TestingUtils.infiniteTime());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId2,
+						slotSharingGroupId,
+						coLocationConstraint1),
+				true,
+				SlotProfile.noLocality(rp2),
+				TestingUtils.infiniteTime());
+
+		final AllocationID allocationId1 = allocationIds.take();
+
+		Collection<SlotOffer> slotOfferFuture1 = slotPoolGateway.offerSlots(
+				taskManagerLocation,
+				new SimpleAckingTaskManagerGateway(),
+				Collections.singletonList(new SlotOffer(
+						allocationId1,
+						0,
+						allocatedSlotRp)));
+
+		assertFalse(slotOfferFuture1.isEmpty());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId3,
+						slotSharingGroupId,
+						coLocationConstraint1),
+				true,
+				SlotProfile.noLocality(rp3),
+				TestingUtils.infiniteTime());
+
+		LogicalSlot logicalSlot1 = logicalSlotFuture1.get();
+		LogicalSlot logicalSlot2 = logicalSlotFuture2.get();
+
+		assertEquals(allocationId1, logicalSlot1.getAllocationId());
+		assertEquals(allocationId1, logicalSlot2.getAllocationId());
+
+		assertTrue(logicalSlotFuture3.isDone() && logicalSlotFuture3.isCompletedExceptionally());
+		logicalSlotFuture3.whenComplete((LogicalSlot ignored, Throwable throwable) -> {
+			assertTrue(throwable instanceof SharedSlotOversubscribedException);
+			assertTrue(((SharedSlotOversubscribedException) throwable).canRetry());
+		});
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
index f1447d0..f2b8fa7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
@@ -331,4 +331,96 @@ public class SlotPoolSlotSharingTest extends TestLogger {
 		assertEquals(allocationId2, logicalSlot3.getAllocationId());
 	}
 
+	@Test
+	public void testRetryOnSharedSlotOverAllocated() throws InterruptedException, ExecutionException {
+		final ResourceProfile rp1 = new ResourceProfile(1.0, 100);
+		final ResourceProfile rp2 = new ResourceProfile(2.0, 200);
+		final ResourceProfile rp3 = new ResourceProfile(5.0, 500);
+
+		final ResourceProfile firstAllocatedSlotRp = new ResourceProfile(3.0, 300);
+		final ResourceProfile secondAllocatedSlotRp = new ResourceProfile(5.0, 500);
+
+		final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
+		final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
+		testingResourceManagerGateway.setRequestSlotConsumer(
+				(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
+
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+		final SlotPoolImpl slotPool = slotPoolResource.getSlotPool();
+		slotPool.registerTaskManager(taskManagerLocation.getResourceID());
+
+		final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+		final JobVertexID jobVertexId1 = new JobVertexID();
+		final JobVertexID jobVertexId2 = new JobVertexID();
+		final JobVertexID jobVertexId3 = new JobVertexID();
+
+		final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
+		CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId1,
+						slotSharingGroupId,
+						null),
+				true,
+				SlotProfile.noLocality(rp1),
+				TestingUtils.infiniteTime());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId2,
+						slotSharingGroupId,
+						null),
+				true,
+				SlotProfile.noLocality(rp2),
+				TestingUtils.infiniteTime());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId3,
+						slotSharingGroupId,
+						null),
+				true,
+				SlotProfile.noLocality(rp3),
+				TestingUtils.infiniteTime());
+
+		assertFalse(logicalSlotFuture1.isDone());
+		assertFalse(logicalSlotFuture2.isDone());
+		assertFalse(logicalSlotFuture3.isDone());
+
+		final AllocationID allocationId1 = allocationIds.take();
+
+		// This should fulfill the first two requests.
+		boolean offerFuture = slotPool.offerSlot(
+				taskManagerLocation,
+				new SimpleAckingTaskManagerGateway(),
+				new SlotOffer(
+						allocationId1,
+						0,
+						firstAllocatedSlotRp));
+
+		assertTrue(offerFuture);
+
+		LogicalSlot logicalSlot1 = logicalSlotFuture1.get();
+		LogicalSlot logicalSlot2 = logicalSlotFuture2.get();
+
+		assertEquals(allocationId1, logicalSlot1.getAllocationId());
+		assertEquals(allocationId1, logicalSlot2.getAllocationId());
+
+		// The third request will retry.
+		assertFalse(logicalSlotFuture3.isDone());
+		final AllocationID allocationId2 = allocationIds.take();
+
+		offerFuture = slotPool.offerSlot(
+				taskManagerLocation,
+				new SimpleAckingTaskManagerGateway(),
+				new SlotOffer(
+						allocationId2,
+						1,
+						secondAllocatedSlotRp));
+
+		assertTrue(offerFuture);
+
+		LogicalSlot logicalSlot3 = logicalSlotFuture3.get();
+		assertEquals(allocationId2, logicalSlot3.getAllocationId());
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
index 0742ac2..9bc1d9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.resources.GPUResource;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -27,11 +28,13 @@ import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -39,18 +42,28 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
+import java.net.InetAddress;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 /**
  * Test cases for the {@link SlotSharingManager}.
@@ -605,4 +618,210 @@ public class SlotSharingManagerTest extends TestLogger {
 		firstChild.release(new Throwable("Release for testing"));
 		assertEquals(ResourceProfile.ZERO, unresolvedRootSlot.getReservedResources());
 	}
+
+	@Test
+	public void testHashEnoughResourceOfMultiTaskSlot() {
+		ResourceProfile rp1 = new ResourceProfile(1.0, 100);
+		ResourceProfile rp2 = new ResourceProfile(2.0, 200);
+		ResourceProfile allocatedSlotRp = new ResourceProfile(2.0, 200);
+
+		final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions();
+
+		SlotSharingManager slotSharingManager = new SlotSharingManager(
+				SLOT_SHARING_GROUP_ID,
+				allocatedSlotActions,
+				SLOT_OWNER);
+
+		CompletableFuture<SlotContext> slotContextFuture = new CompletableFuture<>();
+
+		SlotSharingManager.MultiTaskSlot unresolvedRootSlot = slotSharingManager.createRootSlot(
+				new SlotRequestId(),
+				slotContextFuture,
+				new SlotRequestId());
+
+		SlotSharingManager.MultiTaskSlot multiTaskSlot =
+				unresolvedRootSlot.allocateMultiTaskSlot(new SlotRequestId(), new SlotSharingGroupId());
+
+		SlotSharingManager.SingleTaskSlot firstChild = multiTaskSlot.allocateSingleTaskSlot(
+				new SlotRequestId(),
+				rp1,
+				new SlotSharingGroupId(),
+				Locality.LOCAL);
+
+		assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(rp1), is(true));
+		assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(rp2), is(true));
+		assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(ResourceProfile.UNKNOWN), is(true));
+
+		slotContextFuture.complete(new AllocatedSlot(
+				new AllocationID(),
+				new TaskManagerLocation(new ResourceID("tm-X"), InetAddress.getLoopbackAddress(), 46),
+				0,
+				allocatedSlotRp,
+				mock(TaskManagerGateway.class)));
+
+		assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(rp1), is(true));
+		assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(rp2), is(false));
+		assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(ResourceProfile.UNKNOWN), is(true));
+	}
+
+	@Test
+	public void testSlotAllocatedWithEnoughResource() {
+		SlotSharingResourceTestContext context = createResourceTestContext(new ResourceProfile(16.0, 1600));
+
+		// With enough resources, all the requests should be fulfilled.
+		for (SlotSharingManager.SingleTaskSlot singleTaskSlot : context.singleTaskSlotsInOrder) {
+			assertThat(singleTaskSlot.getLogicalSlotFuture().isDone(), is(true));
+			assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(false));
+		}
+
+		// The multi-task slot for coLocation should be kept.
+		assertThat(context.slotSharingManager.getTaskSlot(context.coLocationTaskSlot.getSlotRequestId()), notNullValue());
+	}
+
+	@Test
+	public void testSlotOverAllocatedAndSingleSlotReleased() {
+		SlotSharingResourceTestContext context = createResourceTestContext(new ResourceProfile(7.0, 700));
+
+		// The two coLocated requests and the third request is successful.
+		for (int i = 0; i < context.singleTaskSlotsInOrder.size(); ++i) {
+			SlotSharingManager.SingleTaskSlot singleTaskSlot = context.singleTaskSlotsInOrder.get(i);
+			assertThat(singleTaskSlot.getLogicalSlotFuture().isDone(), is(true));
+
+			if (i != 3) {
+				assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(false));
+			} else {
+				assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(true));
+				singleTaskSlot.getLogicalSlotFuture().whenComplete((LogicalSlot ignored, Throwable throwable) -> {
+					assertThat(throwable instanceof SharedSlotOversubscribedException, is(true));
+					assertThat(((SharedSlotOversubscribedException) throwable).canRetry(), is(true));
+				});
+			}
+		}
+
+		// The multi-task slot for coLocation should be kept.
+		assertThat(context.slotSharingManager.getTaskSlot(context.coLocationTaskSlot.getSlotRequestId()), notNullValue());
+	}
+
+	@Test
+	public void testSlotOverAllocatedAndMultiTaskSlotReleased() {
+		SlotSharingResourceTestContext context = createResourceTestContext(new ResourceProfile(3.0, 300));
+
+		// Only the third request is fulfilled.
+		for (int i = 0; i < context.singleTaskSlotsInOrder.size(); ++i) {
+			SlotSharingManager.SingleTaskSlot singleTaskSlot = context.singleTaskSlotsInOrder.get(i);
+			assertThat(singleTaskSlot.getLogicalSlotFuture().isDone(), is(true));
+
+			if (i == 2) {
+				assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(false));
+			} else {
+				assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(true));
+				singleTaskSlot.getLogicalSlotFuture().whenComplete((LogicalSlot ignored, Throwable throwable) -> {
+					assertThat(throwable instanceof SharedSlotOversubscribedException, is(true));
+					assertThat(((SharedSlotOversubscribedException) throwable).canRetry(), is(true));
+				});
+			}
+		}
+
+		// The multi-task slot for coLocation should not be kept.
+		assertThat(context.slotSharingManager.getTaskSlot(context.coLocationTaskSlot.getSlotRequestId()), nullValue());
+	}
+
+	@Test
+	public void testSlotOverAllocatedAndAllTaskSlotReleased() {
+		SlotSharingResourceTestContext context = createResourceTestContext(new ResourceProfile(2.0, 200));
+
+		// Only the third request is fulfilled.
+		for (int i = 0; i < context.singleTaskSlotsInOrder.size(); ++i) {
+			SlotSharingManager.SingleTaskSlot singleTaskSlot = context.singleTaskSlotsInOrder.get(i);
+			assertThat(singleTaskSlot.getLogicalSlotFuture().isDone(), is(true));
+
+			assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(true));
+			singleTaskSlot.getLogicalSlotFuture().whenComplete((LogicalSlot ignored, Throwable throwable) -> {
+				assertThat(throwable instanceof SharedSlotOversubscribedException, is(true));
+
+				// Since no request is fulfilled, these requests will be failed and should not retry.
+				assertThat(((SharedSlotOversubscribedException) throwable).canRetry(), is(false));
+			});
+		}
+
+		// All the task slots should be removed.
+		assertThat(context.slotSharingManager.isEmpty(), is(true));
+	}
+
+	private SlotSharingResourceTestContext createResourceTestContext(ResourceProfile allocatedResourceProfile) {
+		ResourceProfile coLocationTaskRp = new ResourceProfile(2.0, 200);
+		ResourceProfile thirdChildRp = new ResourceProfile(3.0, 300);
+		ResourceProfile forthChildRp = new ResourceProfile(9.0, 900);
+
+		final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions();
+
+		SlotSharingManager slotSharingManager = new SlotSharingManager(
+				SLOT_SHARING_GROUP_ID,
+				allocatedSlotActions,
+				SLOT_OWNER);
+
+		CompletableFuture<SlotContext> slotContextFuture = new CompletableFuture<>();
+
+		SlotSharingManager.MultiTaskSlot unresolvedRootSlot = slotSharingManager.createRootSlot(
+				new SlotRequestId(),
+				slotContextFuture,
+				new SlotRequestId());
+
+		SlotSharingManager.MultiTaskSlot coLocationTaskSlot = unresolvedRootSlot.allocateMultiTaskSlot(
+				new SlotRequestId(), new SlotSharingGroupId());
+
+		SlotSharingManager.SingleTaskSlot firstCoLocatedChild = coLocationTaskSlot.allocateSingleTaskSlot(
+				new SlotRequestId(),
+				coLocationTaskRp,
+				new SlotSharingGroupId(),
+				Locality.LOCAL);
+		SlotSharingManager.SingleTaskSlot secondCoLocatedChild = coLocationTaskSlot.allocateSingleTaskSlot(
+				new SlotRequestId(),
+				coLocationTaskRp,
+				new SlotSharingGroupId(),
+				Locality.LOCAL);
+
+		SlotSharingManager.SingleTaskSlot thirdChild = unresolvedRootSlot.allocateSingleTaskSlot(
+				new SlotRequestId(),
+				thirdChildRp,
+				new SlotSharingGroupId(),
+				Locality.LOCAL);
+
+		SlotSharingManager.SingleTaskSlot forthChild = unresolvedRootSlot.allocateSingleTaskSlot(
+				new SlotRequestId(),
+				forthChildRp,
+				new SlotSharingGroupId(),
+				Locality.LOCAL);
+
+		slotContextFuture.complete(new AllocatedSlot(
+				new AllocationID(),
+				new TaskManagerLocation(new ResourceID("tm-X"), InetAddress.getLoopbackAddress(), 46),
+				0,
+				allocatedResourceProfile,
+				mock(TaskManagerGateway.class)));
+
+		return new SlotSharingResourceTestContext(
+				slotSharingManager,
+				coLocationTaskSlot,
+				Arrays.asList(firstCoLocatedChild, secondCoLocatedChild, thirdChild, forthChild));
+	}
+
+	/**
+	 * An utility class maintains the testing sharing slot hierarchy.
+	 */
+	private class SlotSharingResourceTestContext {
+		final SlotSharingManager slotSharingManager;
+		final SlotSharingManager.MultiTaskSlot coLocationTaskSlot;
+		final List<SlotSharingManager.SingleTaskSlot> singleTaskSlotsInOrder;
+
+		SlotSharingResourceTestContext(
+				@Nonnull SlotSharingManager slotSharingManager,
+				@Nonnull SlotSharingManager.MultiTaskSlot coLocationTaskSlot,
+				@Nonnull List<SlotSharingManager.SingleTaskSlot> singleTaskSlotsInOrder) {
+
+			this.slotSharingManager = slotSharingManager;
+			this.coLocationTaskSlot = coLocationTaskSlot;
+			this.singleTaskSlotsInOrder = singleTaskSlotsInOrder;
+		}
+	}
 }