You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/25 21:43:59 UTC

[flink] branch release-1.9 updated: [FLINK-12765][coordinator] Add the remaining resources to the slot selection strategy

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 0b1dafd  [FLINK-12765][coordinator] Add the remaining resources to the slot selection strategy
0b1dafd is described below

commit 0b1dafd52d380a8c1b312fa5cf51185367cfd83d
Author: Gao Yun <yu...@alibaba-inc.com>
AuthorDate: Sat Jul 13 09:02:48 2019 +0800

    [FLINK-12765][coordinator] Add the remaining resources to the slot selection strategy
---
 .../LocationPreferenceSlotSelectionStrategy.java   | 25 +++---
 .../PreviousAllocationSlotSelectionStrategy.java   | 21 +++--
 .../runtime/jobmaster/slotpool/SchedulerImpl.java  | 11 ++-
 .../jobmaster/slotpool/SlotSelectionStrategy.java  | 35 ++++++++-
 .../jobmaster/slotpool/SlotSharingManager.java     | 14 ++--
 ...ocationPreferenceSlotSelectionStrategyTest.java | 13 +++-
 .../types/SlotSelectionStrategyTestBase.java       | 15 ++--
 .../slotpool/SlotPoolSlotSharingTest.java          | 89 ++++++++++++++++++++++
 .../jobmaster/slotpool/SlotSharingManagerTest.java | 54 +++++++++++--
 9 files changed, 228 insertions(+), 49 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java
index 4325d91..1a80c2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java
@@ -22,7 +22,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import javax.annotation.Nonnull;
@@ -48,7 +47,7 @@ public enum LocationPreferenceSlotSelectionStrategy implements SlotSelectionStra
 
 	@Override
 	public Optional<SlotInfoAndLocality> selectBestSlotForProfile(
-		@Nonnull Collection<? extends SlotInfo> availableSlots,
+		@Nonnull Collection<SlotInfoAndResources> availableSlots,
 		@Nonnull SlotProfile slotProfile) {
 
 		Collection<TaskManagerLocation> locationPreferences = slotProfile.getPreferredLocations();
@@ -67,12 +66,12 @@ public enum LocationPreferenceSlotSelectionStrategy implements SlotSelectionStra
 
 	@Nonnull
 	private Optional<SlotInfoAndLocality> selectWithoutLocationPreference(
-		@Nonnull Collection<? extends SlotInfo> availableSlots,
+		@Nonnull Collection<SlotInfoAndResources> availableSlots,
 		@Nonnull ResourceProfile resourceProfile) {
 
-		for (SlotInfo candidate : availableSlots) {
-			if (candidate.getResourceProfile().isMatching(resourceProfile)) {
-				return Optional.of(SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED));
+		for (SlotInfoAndResources candidate : availableSlots) {
+			if (candidate.getRemainingResources().isMatching(resourceProfile)) {
+				return Optional.of(SlotInfoAndLocality.of(candidate.getSlotInfo(), Locality.UNCONSTRAINED));
 			}
 		}
 		return Optional.empty();
@@ -80,7 +79,7 @@ public enum LocationPreferenceSlotSelectionStrategy implements SlotSelectionStra
 
 	@Nonnull
 	private Optional<SlotInfoAndLocality> selectWitLocationPreference(
-		@Nonnull Collection<? extends SlotInfo> availableSlots,
+		@Nonnull Collection<SlotInfoAndResources> availableSlots,
 		@Nonnull Collection<TaskManagerLocation> locationPreferences,
 		@Nonnull ResourceProfile resourceProfile) {
 
@@ -93,21 +92,21 @@ public enum LocationPreferenceSlotSelectionStrategy implements SlotSelectionStra
 			preferredFQHostNames.merge(locationPreference.getFQDNHostname(), 1, Integer::sum);
 		}
 
-		SlotInfo bestCandidate = null;
+		SlotInfoAndResources bestCandidate = null;
 		Locality bestCandidateLocality = Locality.UNKNOWN;
 		int bestCandidateScore = Integer.MIN_VALUE;
 
-		for (SlotInfo candidate : availableSlots) {
+		for (SlotInfoAndResources candidate : availableSlots) {
 
-			if (candidate.getResourceProfile().isMatching(resourceProfile)) {
+			if (candidate.getRemainingResources().isMatching(resourceProfile)) {
 
 				// this gets candidate is local-weigh
 				Integer localWeigh = preferredResourceIDs.getOrDefault(
-					candidate.getTaskManagerLocation().getResourceID(), 0);
+					candidate.getSlotInfo().getTaskManagerLocation().getResourceID(), 0);
 
 				// this gets candidate is host-local-weigh
 				Integer hostLocalWeigh = preferredFQHostNames.getOrDefault(
-					candidate.getTaskManagerLocation().getFQDNHostname(), 0);
+					candidate.getSlotInfo().getTaskManagerLocation().getFQDNHostname(), 0);
 
 				int candidateScore = LOCALITY_EVALUATION_FUNCTION.apply(localWeigh, hostLocalWeigh);
 				if (candidateScore > bestCandidateScore) {
@@ -122,7 +121,7 @@ public enum LocationPreferenceSlotSelectionStrategy implements SlotSelectionStra
 
 		// at the end of the iteration, we return the candidate with best possible locality or null.
 		return bestCandidate != null ?
-			Optional.of(SlotInfoAndLocality.of(bestCandidate, bestCandidateLocality)) :
+			Optional.of(SlotInfoAndLocality.of(bestCandidate.getSlotInfo(), bestCandidateLocality)) :
 			Optional.empty();
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
index e2e7bc2..f8feda8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
 
 import javax.annotation.Nonnull;
 
@@ -41,39 +40,39 @@ public enum PreviousAllocationSlotSelectionStrategy implements SlotSelectionStra
 
 	@Override
 	public Optional<SlotInfoAndLocality> selectBestSlotForProfile(
-		@Nonnull Collection<? extends SlotInfo> availableSlots,
+		@Nonnull Collection<SlotInfoAndResources> availableSlots,
 		@Nonnull SlotProfile slotProfile) {
 
 		Collection<AllocationID> priorAllocations = slotProfile.getPreferredAllocations();
 
 		// First, if there was a prior allocation try to schedule to the same/old slot
 		if (!priorAllocations.isEmpty()) {
-			for (SlotInfo availableSlot : availableSlots) {
-				if (priorAllocations.contains(availableSlot.getAllocationId())) {
+			for (SlotInfoAndResources availableSlot : availableSlots) {
+				if (priorAllocations.contains(availableSlot.getSlotInfo().getAllocationId())) {
 					return Optional.of(
-						SlotInfoAndLocality.of(availableSlot, Locality.LOCAL));
+						SlotInfoAndLocality.of(availableSlot.getSlotInfo(), Locality.LOCAL));
 				}
 			}
 		}
 
 		// Second, select based on location preference, excluding blacklisted allocations
 		Set<AllocationID> blackListedAllocations = slotProfile.getPreviousExecutionGraphAllocations();
-		Collection<? extends SlotInfo> availableAndAllowedSlots = computeWithoutBlacklistedSlots(availableSlots, blackListedAllocations);
+		Collection<SlotInfoAndResources> availableAndAllowedSlots = computeWithoutBlacklistedSlots(availableSlots, blackListedAllocations);
 		return LocationPreferenceSlotSelectionStrategy.INSTANCE.selectBestSlotForProfile(availableAndAllowedSlots, slotProfile);
 	}
 
 	@Nonnull
-	private Collection<SlotInfo> computeWithoutBlacklistedSlots(
-		@Nonnull Collection<? extends SlotInfo> availableSlots,
+	private Collection<SlotInfoAndResources> computeWithoutBlacklistedSlots(
+		@Nonnull Collection<SlotInfoAndResources> availableSlots,
 		@Nonnull Set<AllocationID> blacklistedAllocations) {
 
 		if (blacklistedAllocations.isEmpty()) {
 			return Collections.unmodifiableCollection(availableSlots);
 		}
 
-		ArrayList<SlotInfo> availableAndAllowedSlots = new ArrayList<>(availableSlots.size());
-		for (SlotInfo availableSlot : availableSlots) {
-			if (!blacklistedAllocations.contains(availableSlot.getAllocationId())) {
+		ArrayList<SlotInfoAndResources> availableAndAllowedSlots = new ArrayList<>(availableSlots.size());
+		for (SlotInfoAndResources availableSlot : availableSlots) {
+			if (!blacklistedAllocations.contains(availableSlot.getSlotInfo().getAllocationId())) {
 				availableAndAllowedSlots.add(availableSlot);
 			}
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
index ef87dd1..4bf8eab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotContext;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.ExceptionUtils;
@@ -51,6 +50,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
 
 /**
  * Scheduler that assigns tasks to slots. This class is currently work in progress, comments will be updated as we
@@ -293,7 +293,11 @@ public class SchedulerImpl implements Scheduler {
 		@Nonnull SlotRequestId slotRequestId,
 		@Nonnull SlotProfile slotProfile) {
 
-		Collection<SlotInfo> slotInfoList = slotPool.getAvailableSlotsInformation();
+		Collection<SlotSelectionStrategy.SlotInfoAndResources> slotInfoList =
+				slotPool.getAvailableSlotsInformation()
+						.stream()
+						.map(SlotSelectionStrategy.SlotInfoAndResources::new)
+						.collect(Collectors.toList());
 
 		Optional<SlotSelectionStrategy.SlotInfoAndLocality> selectedAvailableSlot =
 			slotSelectionStrategy.selectBestSlotForProfile(slotInfoList, slotProfile);
@@ -479,7 +483,8 @@ public class SchedulerImpl implements Scheduler {
 			boolean allowQueuedScheduling,
 			@Nullable Time allocationTimeout) throws NoResourceAvailableException {
 
-		Collection<SlotInfo> resolvedRootSlotsInfo = slotSharingManager.listResolvedRootSlotInfo(groupId);
+		Collection<SlotSelectionStrategy.SlotInfoAndResources> resolvedRootSlotsInfo =
+				slotSharingManager.listResolvedRootSlotInfo(groupId);
 
 		SlotSelectionStrategy.SlotInfoAndLocality bestResolvedRootSlotWithLocality =
 			slotSelectionStrategy.selectBestSlotForProfile(resolvedRootSlotsInfo, slotProfile).orElse(null);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java
index 1442145..7394115 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
@@ -36,14 +37,44 @@ public interface SlotSelectionStrategy {
 	 * Selects the best {@link SlotInfo} w.r.t. a certain selection criterion from the provided list of available slots
 	 * and considering the given {@link SlotProfile} that describes the requirements.
 	 *
-	 * @param availableSlots a list of the available slots to select from.
+	 * @param availableSlots a list of the available slots together with their remaining resources to select from.
 	 * @param slotProfile a slot profile, describing requirements for the slot selection.
 	 * @return the selected slot info with the corresponding locality hint.
 	 */
 	Optional<SlotInfoAndLocality> selectBestSlotForProfile(
-		@Nonnull Collection<? extends SlotInfo> availableSlots,
+		@Nonnull Collection<SlotInfoAndResources> availableSlots,
 		@Nonnull SlotProfile slotProfile);
 
+	/**
+	 * This class is a value type that combines a {@link SlotInfo} with its remaining {@link ResourceProfile}.
+	 */
+	final class SlotInfoAndResources {
+
+		@Nonnull
+		private final SlotInfo slotInfo;
+
+		@Nonnull
+		private final ResourceProfile remainingResources;
+
+		public SlotInfoAndResources(@Nonnull SlotInfo slotInfo) {
+			this(slotInfo, slotInfo.getResourceProfile());
+		}
+
+		public SlotInfoAndResources(@Nonnull SlotInfo slotInfo, @Nonnull ResourceProfile remainingResources) {
+			this.slotInfo = slotInfo;
+			this.remainingResources = remainingResources;
+		}
+
+		@Nonnull
+		public SlotInfo getSlotInfo() {
+			return slotInfo;
+		}
+
+		@Nonnull
+		public ResourceProfile getRemainingResources() {
+			return remainingResources;
+		}
+	}
 
 	/**
 	 * This class is a value type that combines a {@link SlotInfo} with a {@link Locality} hint.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index 2c81a91..b2aeed3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -180,14 +180,18 @@ public class SlotSharingManager {
 	}
 
 	@Nonnull
-	public Collection<SlotInfo> listResolvedRootSlotInfo(@Nullable AbstractID groupId) {
+	public Collection<SlotSelectionStrategy.SlotInfoAndResources> listResolvedRootSlotInfo(@Nullable AbstractID groupId) {
 		return resolvedRootSlots
 			.values()
 			.stream()
-			.flatMap((Map<AllocationID, MultiTaskSlot> map) -> map.values().stream())
-			.filter((MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId))
-			.map((MultiTaskSlot multiTaskSlot) -> (SlotInfo) multiTaskSlot.getSlotContextFuture().join())
-			.collect(Collectors.toList());
+				.flatMap((Map<AllocationID, MultiTaskSlot> map) -> map.values().stream())
+				.filter((MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId))
+				.map((MultiTaskSlot multiTaskSlot) -> {
+					SlotInfo slotInfo = multiTaskSlot.getSlotContextFuture().join();
+					return new SlotSelectionStrategy.SlotInfoAndResources(
+							slotInfo,
+							slotInfo.getResourceProfile().subtract(multiTaskSlot.getReservedResources()));
+				}).collect(Collectors.toList());
 	}
 
 	@Nullable
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java
index 2b1d6b0..378d818 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 public class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionStrategyTestBase {
 
@@ -61,7 +62,11 @@ public class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionSt
 		SlotProfile slotProfile = new SlotProfile(ResourceProfile.UNKNOWN, Collections.emptyList(), Collections.emptySet());
 		Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile);
 
-		Assert.assertTrue(candidates.contains(match.get().getSlotInfo()));
+		Assert.assertTrue(
+				candidates.stream()
+						.map(SlotSelectionStrategy.SlotInfoAndResources::getSlotInfo)
+						.collect(Collectors.toList())
+						.contains(match.get().getSlotInfo()));
 	}
 
 	@Test
@@ -70,7 +75,11 @@ public class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionSt
 		SlotProfile slotProfile = new SlotProfile(resourceProfile, Collections.singletonList(tmlX), Collections.emptySet());
 		Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile);
 
-		Assert.assertTrue(candidates.contains(match.get().getSlotInfo()));
+		Assert.assertTrue(
+				candidates.stream()
+						.map(SlotSelectionStrategy.SlotInfoAndResources::getSlotInfo)
+						.collect(Collectors.toList())
+						.contains(match.get().getSlotInfo()));
 	}
 
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java
index 914f6e3..e420cf4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.clusterframework.types;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
@@ -56,7 +55,7 @@ public abstract class SlotSelectionStrategyTestBase extends TestLogger {
 	protected final SimpleSlotContext ssc3 = new SimpleSlotContext(aid3, tml3, 3, taskManagerGateway, resourceProfile);
 	protected final SimpleSlotContext ssc4 = new SimpleSlotContext(aid4, tml4, 4, taskManagerGateway, resourceProfile);
 
-	protected final Set<SlotContext> candidates = Collections.unmodifiableSet(createCandidates());
+	protected final Set<SlotSelectionStrategy.SlotInfoAndResources> candidates = Collections.unmodifiableSet(createCandidates());
 
 	protected final SlotSelectionStrategy selectionStrategy;
 
@@ -64,12 +63,12 @@ public abstract class SlotSelectionStrategyTestBase extends TestLogger {
 		this.selectionStrategy = slotSelectionStrategy;
 	}
 
-	private Set<SlotContext> createCandidates() {
-		Set<SlotContext> candidates = new HashSet<>(4);
-		candidates.add(ssc1);
-		candidates.add(ssc2);
-		candidates.add(ssc3);
-		candidates.add(ssc4);
+	private Set<SlotSelectionStrategy.SlotInfoAndResources> createCandidates() {
+		Set<SlotSelectionStrategy.SlotInfoAndResources> candidates = new HashSet<>(4);
+		candidates.add(new SlotSelectionStrategy.SlotInfoAndResources(ssc1));
+		candidates.add(new SlotSelectionStrategy.SlotInfoAndResources(ssc2));
+		candidates.add(new SlotSelectionStrategy.SlotInfoAndResources(ssc3));
+		candidates.add(new SlotSelectionStrategy.SlotInfoAndResources(ssc4));
 		return candidates;
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
index f2b8fa7..caa3fc3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
@@ -331,6 +331,95 @@ public class SlotPoolSlotSharingTest extends TestLogger {
 		assertEquals(allocationId2, logicalSlot3.getAllocationId());
 	}
 
+	/**
+	 * Tests that when matching from the allocated slot, the remaining resources of the slot
+	 * will be used instead of the total resource.
+	 */
+	@Test
+	public void testSlotSharingRespectsRemainingResource() throws Exception {
+		final ResourceProfile allocatedSlotRp = new ResourceProfile(3.0, 300);
+		final ResourceProfile largeRequestResource = new ResourceProfile(2.0, 200);
+		final ResourceProfile smallRequestResource = new ResourceProfile(1.0, 100);
+
+		final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
+		final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
+		testingResourceManagerGateway.setRequestSlotConsumer(
+				(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
+
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+		final SlotPoolImpl slotPool = slotPoolResource.getSlotPool();
+		slotPool.registerTaskManager(taskManagerLocation.getResourceID());
+
+		final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+		final JobVertexID jobVertexId1 = new JobVertexID();
+		final JobVertexID jobVertexId2 = new JobVertexID();
+		final JobVertexID jobVertexId3 = new JobVertexID();
+
+		final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
+		CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId1,
+						slotSharingGroupId,
+						null),
+				true,
+				SlotProfile.noLocality(largeRequestResource),
+				TestingUtils.infiniteTime());
+
+		final AllocationID allocationId1 = allocationIds.take();
+
+		// This should fulfill the first request.
+		boolean offerFuture = slotPool.offerSlot(
+				taskManagerLocation,
+				new SimpleAckingTaskManagerGateway(),
+				new SlotOffer(
+						allocationId1,
+						0,
+						allocatedSlotRp));
+
+		assertTrue(offerFuture);
+		assertTrue(logicalSlotFuture1.isDone());
+		assertEquals(allocationId1, logicalSlotFuture1.get().getAllocationId());
+
+		// The second request should not share the same slot with the first request since it is large.
+		CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId2,
+						slotSharingGroupId,
+						null),
+				true,
+				SlotProfile.noLocality(largeRequestResource),
+				TestingUtils.infiniteTime());
+		assertFalse(logicalSlotFuture2.isDone());
+
+		// The third request should be able to share the same slot with the first request since it is small.
+		CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId3,
+						slotSharingGroupId,
+						null),
+				true,
+				SlotProfile.noLocality(smallRequestResource),
+				TestingUtils.infiniteTime());
+		assertTrue(logicalSlotFuture3.isDone());
+		assertEquals(allocationId1, logicalSlotFuture1.get().getAllocationId());
+
+		// The second request should be finally fulfilled by a new slot.
+		final AllocationID allocationId2 = allocationIds.take();
+		// This should fulfill the first two requests.
+		offerFuture = slotPool.offerSlot(
+				taskManagerLocation,
+				new SimpleAckingTaskManagerGateway(),
+				new SlotOffer(
+						allocationId2,
+						0,
+						allocatedSlotRp));
+
+		assertTrue(offerFuture);
+		assertTrue(logicalSlotFuture2.isDone());
+		assertEquals(allocationId2, logicalSlotFuture2.get().getAllocationId());
+	}
+
 	@Test
 	public void testRetryOnSharedSlotOverAllocated() throws InterruptedException, ExecutionException {
 		final ResourceProfile rp1 = new ResourceProfile(1.0, 100);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
index 9bc1d9b..ecf2e4e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotContext;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -439,12 +438,12 @@ public class SlotSharingManagerTest extends TestLogger {
 
 		AbstractID groupId = new AbstractID();
 
-		Collection<SlotInfo> slotInfos = slotSharingManager.listResolvedRootSlotInfo(groupId);
+		Collection<SlotSelectionStrategy.SlotInfoAndResources> slotInfos = slotSharingManager.listResolvedRootSlotInfo(groupId);
 		Assert.assertEquals(1, slotInfos.size());
 
-		SlotInfo slotInfo = slotInfos.iterator().next();
+		SlotSelectionStrategy.SlotInfoAndResources slotInfoAndRemainingResource = slotInfos.iterator().next();
 		SlotSharingManager.MultiTaskSlot resolvedMultiTaskSlot =
-			slotSharingManager.getResolvedRootSlot(slotInfo);
+			slotSharingManager.getResolvedRootSlot(slotInfoAndRemainingResource.getSlotInfo());
 
 		SlotSelectionStrategy.SlotInfoAndLocality slotInfoAndLocality =
 			LocationPreferenceSlotSelectionStrategy.INSTANCE.selectBestSlotForProfile(slotInfos, SlotProfile.noRequirements()).get();
@@ -501,7 +500,7 @@ public class SlotSharingManagerTest extends TestLogger {
 
 		SlotProfile slotProfile = SlotProfile.preferredLocality(ResourceProfile.UNKNOWN, Collections.singleton(taskManagerLocation));
 
-		Collection<SlotInfo> slotInfos = slotSharingManager.listResolvedRootSlotInfo(groupId);
+		Collection<SlotSelectionStrategy.SlotInfoAndResources> slotInfos = slotSharingManager.listResolvedRootSlotInfo(groupId);
 		SlotSelectionStrategy.SlotInfoAndLocality slotInfoAndLocality =
 			LocationPreferenceSlotSelectionStrategy.INSTANCE.selectBestSlotForProfile(slotInfos, slotProfile).get();
 		SlotSharingManager.MultiTaskSlot resolvedRootSlot = slotSharingManager.getResolvedRootSlot(slotInfoAndLocality.getSlotInfo());
@@ -620,6 +619,51 @@ public class SlotSharingManagerTest extends TestLogger {
 	}
 
 	@Test
+	public void testGetResolvedSlotWithResourceConfigured() {
+		ResourceProfile rp1 = new ResourceProfile(1.0, 100);
+		ResourceProfile rp2 = new ResourceProfile(2.0, 200);
+		ResourceProfile allocatedSlotRp = new ResourceProfile(5.0, 500);
+
+		final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions();
+
+		SlotSharingManager slotSharingManager = new SlotSharingManager(
+				SLOT_SHARING_GROUP_ID,
+				allocatedSlotActions,
+				SLOT_OWNER);
+
+		SlotSharingManager.MultiTaskSlot rootSlot = slotSharingManager.createRootSlot(
+				new SlotRequestId(),
+				CompletableFuture.completedFuture(
+						new SimpleSlotContext(
+								new AllocationID(),
+								new LocalTaskManagerLocation(),
+								0,
+								new SimpleAckingTaskManagerGateway(),
+								allocatedSlotRp)),
+				new SlotRequestId());
+
+		rootSlot.allocateSingleTaskSlot(
+				new SlotRequestId(),
+				rp1,
+				new SlotSharingGroupId(),
+				Locality.LOCAL);
+
+		Collection<SlotSelectionStrategy.SlotInfoAndResources> resolvedRoots =
+			slotSharingManager.listResolvedRootSlotInfo(new AbstractID());
+		assertEquals(1, resolvedRoots.size());
+		assertEquals(allocatedSlotRp.subtract(rp1), resolvedRoots.iterator().next().getRemainingResources());
+
+		rootSlot.allocateSingleTaskSlot(
+				new SlotRequestId(),
+				rp2,
+				new SlotSharingGroupId(),
+				Locality.LOCAL);
+		resolvedRoots = slotSharingManager.listResolvedRootSlotInfo(new AbstractID());
+		assertEquals(1, resolvedRoots.size());
+		assertEquals(allocatedSlotRp.subtract(rp1).subtract(rp2), resolvedRoots.iterator().next().getRemainingResources());
+	}
+
+	@Test
 	public void testHashEnoughResourceOfMultiTaskSlot() {
 		ResourceProfile rp1 = new ResourceProfile(1.0, 100);
 		ResourceProfile rp2 = new ResourceProfile(2.0, 200);