You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/25 21:43:59 UTC
[flink] branch release-1.9 updated: [FLINK-12765][coordinator] Add
the remaining resources to the slot selection strategy
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 0b1dafd [FLINK-12765][coordinator] Add the remaining resources to the slot selection strategy
0b1dafd is described below
commit 0b1dafd52d380a8c1b312fa5cf51185367cfd83d
Author: Gao Yun <yu...@alibaba-inc.com>
AuthorDate: Sat Jul 13 09:02:48 2019 +0800
[FLINK-12765][coordinator] Add the remaining resources to the slot selection strategy
---
.../LocationPreferenceSlotSelectionStrategy.java | 25 +++---
.../PreviousAllocationSlotSelectionStrategy.java | 21 +++--
.../runtime/jobmaster/slotpool/SchedulerImpl.java | 11 ++-
.../jobmaster/slotpool/SlotSelectionStrategy.java | 35 ++++++++-
.../jobmaster/slotpool/SlotSharingManager.java | 14 ++--
...ocationPreferenceSlotSelectionStrategyTest.java | 13 +++-
.../types/SlotSelectionStrategyTestBase.java | 15 ++--
.../slotpool/SlotPoolSlotSharingTest.java | 89 ++++++++++++++++++++++
.../jobmaster/slotpool/SlotSharingManagerTest.java | 54 +++++++++++--
9 files changed, 228 insertions(+), 49 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java
index 4325d91..1a80c2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java
@@ -22,7 +22,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import javax.annotation.Nonnull;
@@ -48,7 +47,7 @@ public enum LocationPreferenceSlotSelectionStrategy implements SlotSelectionStra
@Override
public Optional<SlotInfoAndLocality> selectBestSlotForProfile(
- @Nonnull Collection<? extends SlotInfo> availableSlots,
+ @Nonnull Collection<SlotInfoAndResources> availableSlots,
@Nonnull SlotProfile slotProfile) {
Collection<TaskManagerLocation> locationPreferences = slotProfile.getPreferredLocations();
@@ -67,12 +66,12 @@ public enum LocationPreferenceSlotSelectionStrategy implements SlotSelectionStra
@Nonnull
private Optional<SlotInfoAndLocality> selectWithoutLocationPreference(
- @Nonnull Collection<? extends SlotInfo> availableSlots,
+ @Nonnull Collection<SlotInfoAndResources> availableSlots,
@Nonnull ResourceProfile resourceProfile) {
- for (SlotInfo candidate : availableSlots) {
- if (candidate.getResourceProfile().isMatching(resourceProfile)) {
- return Optional.of(SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED));
+ for (SlotInfoAndResources candidate : availableSlots) {
+ if (candidate.getRemainingResources().isMatching(resourceProfile)) {
+ return Optional.of(SlotInfoAndLocality.of(candidate.getSlotInfo(), Locality.UNCONSTRAINED));
}
}
return Optional.empty();
@@ -80,7 +79,7 @@ public enum LocationPreferenceSlotSelectionStrategy implements SlotSelectionStra
@Nonnull
private Optional<SlotInfoAndLocality> selectWitLocationPreference(
- @Nonnull Collection<? extends SlotInfo> availableSlots,
+ @Nonnull Collection<SlotInfoAndResources> availableSlots,
@Nonnull Collection<TaskManagerLocation> locationPreferences,
@Nonnull ResourceProfile resourceProfile) {
@@ -93,21 +92,21 @@ public enum LocationPreferenceSlotSelectionStrategy implements SlotSelectionStra
preferredFQHostNames.merge(locationPreference.getFQDNHostname(), 1, Integer::sum);
}
- SlotInfo bestCandidate = null;
+ SlotInfoAndResources bestCandidate = null;
Locality bestCandidateLocality = Locality.UNKNOWN;
int bestCandidateScore = Integer.MIN_VALUE;
- for (SlotInfo candidate : availableSlots) {
+ for (SlotInfoAndResources candidate : availableSlots) {
- if (candidate.getResourceProfile().isMatching(resourceProfile)) {
+ if (candidate.getRemainingResources().isMatching(resourceProfile)) {
// this gets candidate is local-weigh
Integer localWeigh = preferredResourceIDs.getOrDefault(
- candidate.getTaskManagerLocation().getResourceID(), 0);
+ candidate.getSlotInfo().getTaskManagerLocation().getResourceID(), 0);
// this gets candidate is host-local-weigh
Integer hostLocalWeigh = preferredFQHostNames.getOrDefault(
- candidate.getTaskManagerLocation().getFQDNHostname(), 0);
+ candidate.getSlotInfo().getTaskManagerLocation().getFQDNHostname(), 0);
int candidateScore = LOCALITY_EVALUATION_FUNCTION.apply(localWeigh, hostLocalWeigh);
if (candidateScore > bestCandidateScore) {
@@ -122,7 +121,7 @@ public enum LocationPreferenceSlotSelectionStrategy implements SlotSelectionStra
// at the end of the iteration, we return the candidate with best possible locality or null.
return bestCandidate != null ?
- Optional.of(SlotInfoAndLocality.of(bestCandidate, bestCandidateLocality)) :
+ Optional.of(SlotInfoAndLocality.of(bestCandidate.getSlotInfo(), bestCandidateLocality)) :
Optional.empty();
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
index e2e7bc2..f8feda8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
import javax.annotation.Nonnull;
@@ -41,39 +40,39 @@ public enum PreviousAllocationSlotSelectionStrategy implements SlotSelectionStra
@Override
public Optional<SlotInfoAndLocality> selectBestSlotForProfile(
- @Nonnull Collection<? extends SlotInfo> availableSlots,
+ @Nonnull Collection<SlotInfoAndResources> availableSlots,
@Nonnull SlotProfile slotProfile) {
Collection<AllocationID> priorAllocations = slotProfile.getPreferredAllocations();
// First, if there was a prior allocation try to schedule to the same/old slot
if (!priorAllocations.isEmpty()) {
- for (SlotInfo availableSlot : availableSlots) {
- if (priorAllocations.contains(availableSlot.getAllocationId())) {
+ for (SlotInfoAndResources availableSlot : availableSlots) {
+ if (priorAllocations.contains(availableSlot.getSlotInfo().getAllocationId())) {
return Optional.of(
- SlotInfoAndLocality.of(availableSlot, Locality.LOCAL));
+ SlotInfoAndLocality.of(availableSlot.getSlotInfo(), Locality.LOCAL));
}
}
}
// Second, select based on location preference, excluding blacklisted allocations
Set<AllocationID> blackListedAllocations = slotProfile.getPreviousExecutionGraphAllocations();
- Collection<? extends SlotInfo> availableAndAllowedSlots = computeWithoutBlacklistedSlots(availableSlots, blackListedAllocations);
+ Collection<SlotInfoAndResources> availableAndAllowedSlots = computeWithoutBlacklistedSlots(availableSlots, blackListedAllocations);
return LocationPreferenceSlotSelectionStrategy.INSTANCE.selectBestSlotForProfile(availableAndAllowedSlots, slotProfile);
}
@Nonnull
- private Collection<SlotInfo> computeWithoutBlacklistedSlots(
- @Nonnull Collection<? extends SlotInfo> availableSlots,
+ private Collection<SlotInfoAndResources> computeWithoutBlacklistedSlots(
+ @Nonnull Collection<SlotInfoAndResources> availableSlots,
@Nonnull Set<AllocationID> blacklistedAllocations) {
if (blacklistedAllocations.isEmpty()) {
return Collections.unmodifiableCollection(availableSlots);
}
- ArrayList<SlotInfo> availableAndAllowedSlots = new ArrayList<>(availableSlots.size());
- for (SlotInfo availableSlot : availableSlots) {
- if (!blacklistedAllocations.contains(availableSlot.getAllocationId())) {
+ ArrayList<SlotInfoAndResources> availableAndAllowedSlots = new ArrayList<>(availableSlots.size());
+ for (SlotInfoAndResources availableSlot : availableSlots) {
+ if (!blacklistedAllocations.contains(availableSlot.getSlotInfo().getAllocationId())) {
availableAndAllowedSlots.add(availableSlot);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
index ef87dd1..4bf8eab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotContext;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.ExceptionUtils;
@@ -51,6 +50,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
/**
* Scheduler that assigns tasks to slots. This class is currently work in progress, comments will be updated as we
@@ -293,7 +293,11 @@ public class SchedulerImpl implements Scheduler {
@Nonnull SlotRequestId slotRequestId,
@Nonnull SlotProfile slotProfile) {
- Collection<SlotInfo> slotInfoList = slotPool.getAvailableSlotsInformation();
+ Collection<SlotSelectionStrategy.SlotInfoAndResources> slotInfoList =
+ slotPool.getAvailableSlotsInformation()
+ .stream()
+ .map(SlotSelectionStrategy.SlotInfoAndResources::new)
+ .collect(Collectors.toList());
Optional<SlotSelectionStrategy.SlotInfoAndLocality> selectedAvailableSlot =
slotSelectionStrategy.selectBestSlotForProfile(slotInfoList, slotProfile);
@@ -479,7 +483,8 @@ public class SchedulerImpl implements Scheduler {
boolean allowQueuedScheduling,
@Nullable Time allocationTimeout) throws NoResourceAvailableException {
- Collection<SlotInfo> resolvedRootSlotsInfo = slotSharingManager.listResolvedRootSlotInfo(groupId);
+ Collection<SlotSelectionStrategy.SlotInfoAndResources> resolvedRootSlotsInfo =
+ slotSharingManager.listResolvedRootSlotInfo(groupId);
SlotSelectionStrategy.SlotInfoAndLocality bestResolvedRootSlotWithLocality =
slotSelectionStrategy.selectBestSlotForProfile(resolvedRootSlotsInfo, slotProfile).orElse(null);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java
index 1442145..7394115 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobmaster.slotpool;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.SlotInfo;
@@ -36,14 +37,44 @@ public interface SlotSelectionStrategy {
* Selects the best {@link SlotInfo} w.r.t. a certain selection criterion from the provided list of available slots
* and considering the given {@link SlotProfile} that describes the requirements.
*
- * @param availableSlots a list of the available slots to select from.
+ * @param availableSlots a list of the available slots together with their remaining resources to select from.
* @param slotProfile a slot profile, describing requirements for the slot selection.
* @return the selected slot info with the corresponding locality hint.
*/
Optional<SlotInfoAndLocality> selectBestSlotForProfile(
- @Nonnull Collection<? extends SlotInfo> availableSlots,
+ @Nonnull Collection<SlotInfoAndResources> availableSlots,
@Nonnull SlotProfile slotProfile);
+ /**
+ * This class is a value type that combines a {@link SlotInfo} with its remaining {@link ResourceProfile}.
+ */
+ final class SlotInfoAndResources {
+
+ @Nonnull
+ private final SlotInfo slotInfo;
+
+ @Nonnull
+ private final ResourceProfile remainingResources;
+
+ public SlotInfoAndResources(@Nonnull SlotInfo slotInfo) {
+ this(slotInfo, slotInfo.getResourceProfile());
+ }
+
+ public SlotInfoAndResources(@Nonnull SlotInfo slotInfo, @Nonnull ResourceProfile remainingResources) {
+ this.slotInfo = slotInfo;
+ this.remainingResources = remainingResources;
+ }
+
+ @Nonnull
+ public SlotInfo getSlotInfo() {
+ return slotInfo;
+ }
+
+ @Nonnull
+ public ResourceProfile getRemainingResources() {
+ return remainingResources;
+ }
+ }
/**
* This class is a value type that combines a {@link SlotInfo} with a {@link Locality} hint.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index 2c81a91..b2aeed3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -180,14 +180,18 @@ public class SlotSharingManager {
}
@Nonnull
- public Collection<SlotInfo> listResolvedRootSlotInfo(@Nullable AbstractID groupId) {
+ public Collection<SlotSelectionStrategy.SlotInfoAndResources> listResolvedRootSlotInfo(@Nullable AbstractID groupId) {
return resolvedRootSlots
.values()
.stream()
- .flatMap((Map<AllocationID, MultiTaskSlot> map) -> map.values().stream())
- .filter((MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId))
- .map((MultiTaskSlot multiTaskSlot) -> (SlotInfo) multiTaskSlot.getSlotContextFuture().join())
- .collect(Collectors.toList());
+ .flatMap((Map<AllocationID, MultiTaskSlot> map) -> map.values().stream())
+ .filter((MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId))
+ .map((MultiTaskSlot multiTaskSlot) -> {
+ SlotInfo slotInfo = multiTaskSlot.getSlotContextFuture().join();
+ return new SlotSelectionStrategy.SlotInfoAndResources(
+ slotInfo,
+ slotInfo.getResourceProfile().subtract(multiTaskSlot.getReservedResources()));
+ }).collect(Collectors.toList());
}
@Nullable
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java
index 2b1d6b0..378d818 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
+import java.util.stream.Collectors;
public class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionStrategyTestBase {
@@ -61,7 +62,11 @@ public class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionSt
SlotProfile slotProfile = new SlotProfile(ResourceProfile.UNKNOWN, Collections.emptyList(), Collections.emptySet());
Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile);
- Assert.assertTrue(candidates.contains(match.get().getSlotInfo()));
+ Assert.assertTrue(
+ candidates.stream()
+ .map(SlotSelectionStrategy.SlotInfoAndResources::getSlotInfo)
+ .collect(Collectors.toList())
+ .contains(match.get().getSlotInfo()));
}
@Test
@@ -70,7 +75,11 @@ public class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionSt
SlotProfile slotProfile = new SlotProfile(resourceProfile, Collections.singletonList(tmlX), Collections.emptySet());
Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile);
- Assert.assertTrue(candidates.contains(match.get().getSlotInfo()));
+ Assert.assertTrue(
+ candidates.stream()
+ .map(SlotSelectionStrategy.SlotInfoAndResources::getSlotInfo)
+ .collect(Collectors.toList())
+ .contains(match.get().getSlotInfo()));
}
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java
index 914f6e3..e420cf4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.clusterframework.types;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
@@ -56,7 +55,7 @@ public abstract class SlotSelectionStrategyTestBase extends TestLogger {
protected final SimpleSlotContext ssc3 = new SimpleSlotContext(aid3, tml3, 3, taskManagerGateway, resourceProfile);
protected final SimpleSlotContext ssc4 = new SimpleSlotContext(aid4, tml4, 4, taskManagerGateway, resourceProfile);
- protected final Set<SlotContext> candidates = Collections.unmodifiableSet(createCandidates());
+ protected final Set<SlotSelectionStrategy.SlotInfoAndResources> candidates = Collections.unmodifiableSet(createCandidates());
protected final SlotSelectionStrategy selectionStrategy;
@@ -64,12 +63,12 @@ public abstract class SlotSelectionStrategyTestBase extends TestLogger {
this.selectionStrategy = slotSelectionStrategy;
}
- private Set<SlotContext> createCandidates() {
- Set<SlotContext> candidates = new HashSet<>(4);
- candidates.add(ssc1);
- candidates.add(ssc2);
- candidates.add(ssc3);
- candidates.add(ssc4);
+ private Set<SlotSelectionStrategy.SlotInfoAndResources> createCandidates() {
+ Set<SlotSelectionStrategy.SlotInfoAndResources> candidates = new HashSet<>(4);
+ candidates.add(new SlotSelectionStrategy.SlotInfoAndResources(ssc1));
+ candidates.add(new SlotSelectionStrategy.SlotInfoAndResources(ssc2));
+ candidates.add(new SlotSelectionStrategy.SlotInfoAndResources(ssc3));
+ candidates.add(new SlotSelectionStrategy.SlotInfoAndResources(ssc4));
return candidates;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
index f2b8fa7..caa3fc3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
@@ -331,6 +331,95 @@ public class SlotPoolSlotSharingTest extends TestLogger {
assertEquals(allocationId2, logicalSlot3.getAllocationId());
}
+ /**
+ * Tests that when matching from the allocated slot, the remaining resources of the slot
+ * will be used instead of the total resource.
+ */
+ @Test
+ public void testSlotSharingRespectsRemainingResource() throws Exception {
+ final ResourceProfile allocatedSlotRp = new ResourceProfile(3.0, 300);
+ final ResourceProfile largeRequestResource = new ResourceProfile(2.0, 200);
+ final ResourceProfile smallRequestResource = new ResourceProfile(1.0, 100);
+
+ final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
+ final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
+ testingResourceManagerGateway.setRequestSlotConsumer(
+ (SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
+
+ final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+ final SlotPoolImpl slotPool = slotPoolResource.getSlotPool();
+ slotPool.registerTaskManager(taskManagerLocation.getResourceID());
+
+ final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+ final JobVertexID jobVertexId1 = new JobVertexID();
+ final JobVertexID jobVertexId2 = new JobVertexID();
+ final JobVertexID jobVertexId3 = new JobVertexID();
+
+ final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
+ CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId1,
+ slotSharingGroupId,
+ null),
+ true,
+ SlotProfile.noLocality(largeRequestResource),
+ TestingUtils.infiniteTime());
+
+ final AllocationID allocationId1 = allocationIds.take();
+
+ // This should fulfill the first request.
+ boolean offerFuture = slotPool.offerSlot(
+ taskManagerLocation,
+ new SimpleAckingTaskManagerGateway(),
+ new SlotOffer(
+ allocationId1,
+ 0,
+ allocatedSlotRp));
+
+ assertTrue(offerFuture);
+ assertTrue(logicalSlotFuture1.isDone());
+ assertEquals(allocationId1, logicalSlotFuture1.get().getAllocationId());
+
+ // The second request should not share the same slot with the first request since it is large.
+ CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId2,
+ slotSharingGroupId,
+ null),
+ true,
+ SlotProfile.noLocality(largeRequestResource),
+ TestingUtils.infiniteTime());
+ assertFalse(logicalSlotFuture2.isDone());
+
+ // The third request should be able to share the same slot with the first request since it is small.
+ CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId3,
+ slotSharingGroupId,
+ null),
+ true,
+ SlotProfile.noLocality(smallRequestResource),
+ TestingUtils.infiniteTime());
+ assertTrue(logicalSlotFuture3.isDone());
+ assertEquals(allocationId1, logicalSlotFuture1.get().getAllocationId());
+
+ // The second request should be finally fulfilled by a new slot.
+ final AllocationID allocationId2 = allocationIds.take();
+ // This should fulfill the first two requests.
+ offerFuture = slotPool.offerSlot(
+ taskManagerLocation,
+ new SimpleAckingTaskManagerGateway(),
+ new SlotOffer(
+ allocationId2,
+ 0,
+ allocatedSlotRp));
+
+ assertTrue(offerFuture);
+ assertTrue(logicalSlotFuture2.isDone());
+ assertEquals(allocationId2, logicalSlotFuture2.get().getAllocationId());
+ }
+
@Test
public void testRetryOnSharedSlotOverAllocated() throws InterruptedException, ExecutionException {
final ResourceProfile rp1 = new ResourceProfile(1.0, 100);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
index 9bc1d9b..ecf2e4e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotContext;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -439,12 +438,12 @@ public class SlotSharingManagerTest extends TestLogger {
AbstractID groupId = new AbstractID();
- Collection<SlotInfo> slotInfos = slotSharingManager.listResolvedRootSlotInfo(groupId);
+ Collection<SlotSelectionStrategy.SlotInfoAndResources> slotInfos = slotSharingManager.listResolvedRootSlotInfo(groupId);
Assert.assertEquals(1, slotInfos.size());
- SlotInfo slotInfo = slotInfos.iterator().next();
+ SlotSelectionStrategy.SlotInfoAndResources slotInfoAndRemainingResource = slotInfos.iterator().next();
SlotSharingManager.MultiTaskSlot resolvedMultiTaskSlot =
- slotSharingManager.getResolvedRootSlot(slotInfo);
+ slotSharingManager.getResolvedRootSlot(slotInfoAndRemainingResource.getSlotInfo());
SlotSelectionStrategy.SlotInfoAndLocality slotInfoAndLocality =
LocationPreferenceSlotSelectionStrategy.INSTANCE.selectBestSlotForProfile(slotInfos, SlotProfile.noRequirements()).get();
@@ -501,7 +500,7 @@ public class SlotSharingManagerTest extends TestLogger {
SlotProfile slotProfile = SlotProfile.preferredLocality(ResourceProfile.UNKNOWN, Collections.singleton(taskManagerLocation));
- Collection<SlotInfo> slotInfos = slotSharingManager.listResolvedRootSlotInfo(groupId);
+ Collection<SlotSelectionStrategy.SlotInfoAndResources> slotInfos = slotSharingManager.listResolvedRootSlotInfo(groupId);
SlotSelectionStrategy.SlotInfoAndLocality slotInfoAndLocality =
LocationPreferenceSlotSelectionStrategy.INSTANCE.selectBestSlotForProfile(slotInfos, slotProfile).get();
SlotSharingManager.MultiTaskSlot resolvedRootSlot = slotSharingManager.getResolvedRootSlot(slotInfoAndLocality.getSlotInfo());
@@ -620,6 +619,51 @@ public class SlotSharingManagerTest extends TestLogger {
}
@Test
+ public void testGetResolvedSlotWithResourceConfigured() {
+ ResourceProfile rp1 = new ResourceProfile(1.0, 100);
+ ResourceProfile rp2 = new ResourceProfile(2.0, 200);
+ ResourceProfile allocatedSlotRp = new ResourceProfile(5.0, 500);
+
+ final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions();
+
+ SlotSharingManager slotSharingManager = new SlotSharingManager(
+ SLOT_SHARING_GROUP_ID,
+ allocatedSlotActions,
+ SLOT_OWNER);
+
+ SlotSharingManager.MultiTaskSlot rootSlot = slotSharingManager.createRootSlot(
+ new SlotRequestId(),
+ CompletableFuture.completedFuture(
+ new SimpleSlotContext(
+ new AllocationID(),
+ new LocalTaskManagerLocation(),
+ 0,
+ new SimpleAckingTaskManagerGateway(),
+ allocatedSlotRp)),
+ new SlotRequestId());
+
+ rootSlot.allocateSingleTaskSlot(
+ new SlotRequestId(),
+ rp1,
+ new SlotSharingGroupId(),
+ Locality.LOCAL);
+
+ Collection<SlotSelectionStrategy.SlotInfoAndResources> resolvedRoots =
+ slotSharingManager.listResolvedRootSlotInfo(new AbstractID());
+ assertEquals(1, resolvedRoots.size());
+ assertEquals(allocatedSlotRp.subtract(rp1), resolvedRoots.iterator().next().getRemainingResources());
+
+ rootSlot.allocateSingleTaskSlot(
+ new SlotRequestId(),
+ rp2,
+ new SlotSharingGroupId(),
+ Locality.LOCAL);
+ resolvedRoots = slotSharingManager.listResolvedRootSlotInfo(new AbstractID());
+ assertEquals(1, resolvedRoots.size());
+ assertEquals(allocatedSlotRp.subtract(rp1).subtract(rp2), resolvedRoots.iterator().next().getRemainingResources());
+ }
+
+ @Test
public void testHashEnoughResourceOfMultiTaskSlot() {
ResourceProfile rp1 = new ResourceProfile(1.0, 100);
ResourceProfile rp2 = new ResourceProfile(2.0, 200);