You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/07/11 15:37:57 UTC
[flink] 03/05: [FLINK-28144][runtime] Introduce SlotPoolService#releaseFreeSlotsOnTaskManager to release free slots eagerly when blocking nodes
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 04f2f0c2660b312449419a3acb58a46a38d84f64
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Mon Jul 4 14:49:10 2022 +0800
[FLINK-28144][runtime] Introduce SlotPoolService#releaseFreeSlotsOnTaskManager to release free slots eagerly when blocking nodes
---
.../slotpool/DeclarativeSlotPoolService.java | 26 ++++++++++++
.../jobmaster/slotpool/SlotPoolService.java | 8 ++++
.../flink/runtime/jobmaster/JobMasterTest.java | 6 +++
.../slotpool/DeclarativeSlotPoolServiceTest.java | 47 ++++++++++++++++++++++
.../jobmaster/slotpool/TestingSlotPoolService.java | 6 +++
5 files changed, 93 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
index ec1a97005b6..5379c96cfea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
@@ -49,6 +49,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
/** {@link SlotPoolService} implementation for the {@link DeclarativeSlotPool}. */
public class DeclarativeSlotPoolService implements SlotPoolService {
@@ -234,6 +235,31 @@ public class DeclarativeSlotPoolService implements SlotPoolService {
return false;
}
+ @Override
+ public void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause) {
+ assertHasBeenStarted();
+ if (isTaskManagerRegistered(taskManagerId)) {
+
+ Collection<AllocationID> freeSlots =
+ declarativeSlotPool.getFreeSlotsInformation().stream()
+ .filter(
+ slotInfo ->
+ slotInfo.getTaskManagerLocation()
+ .getResourceID()
+ .equals(taskManagerId))
+ .map(SlotInfoWithUtilization::getAllocationId)
+ .collect(Collectors.toSet());
+
+ for (AllocationID allocationId : freeSlots) {
+ final ResourceCounter previouslyFulfilledRequirement =
+ declarativeSlotPool.releaseSlot(allocationId, cause);
+ // release free slots, previously fulfilled requirement should be empty.
+ Preconditions.checkState(
+ previouslyFulfilledRequirement.equals(ResourceCounter.empty()));
+ }
+ }
+ }
+
private void releaseAllTaskManagers(Exception cause) {
for (ResourceID registeredTaskManager : registeredTaskManagers) {
internalReleaseTaskManager(registeredTaskManager, cause);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java
index fc8b3671ee7..b0322dd2c5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java
@@ -113,6 +113,14 @@ public interface SlotPoolService extends AutoCloseable {
*/
boolean releaseTaskManager(ResourceID taskManagerId, Exception cause);
+ /**
+ * Releases all free slots belonging to the owning TaskExecutor if it has been registered.
+ *
+ * @param taskManagerId identifying the TaskExecutor
+ * @param cause cause for failing the slots
+ */
+ void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause);
+
/**
* Connects the SlotPool to the given ResourceManager. After this method is called, the SlotPool
* will be able to request resources from the given ResourceManager.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index d7a3afbb688..e07ad72516d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -518,6 +518,12 @@ class JobMasterTest {
return true;
}
+ @Override
+ public void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause) {
+ throw new UnsupportedOperationException(
+ "TestingSlotPool does not support this operation.");
+ }
+
@Override
public Collection<SlotOffer> offerSlots(
TaskManagerLocation taskManagerLocation,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
index 21d47552cfb..9d540128848 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
@@ -43,6 +43,8 @@ import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.clock.SystemClock;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
@@ -306,6 +308,51 @@ class DeclarativeSlotPoolServiceTest {
}
}
+ @Test
+ void testReleaseFreeSlotsOnTaskManager() throws Exception {
+ try (DeclarativeSlotPoolService slotPoolService = createDeclarativeSlotPoolService()) {
+ final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+ slotPoolService.registerTaskManager(taskManagerLocation.getResourceID());
+
+ final ResourceProfile resourceProfile =
+ ResourceProfile.newBuilder().setCpuCores(1).build();
+
+ SlotOffer slotOffer1 = new SlotOffer(new AllocationID(), 0, resourceProfile);
+ SlotOffer slotOffer2 = new SlotOffer(new AllocationID(), 1, resourceProfile);
+
+ final DeclarativeSlotPool slotPool = slotPoolService.getDeclarativeSlotPool();
+ slotPool.setResourceRequirements(ResourceCounter.withResource(resourceProfile, 2));
+
+ final DefaultDeclarativeSlotPoolTest.FreeSlotConsumer freeSlotConsumer =
+ new DefaultDeclarativeSlotPoolTest.FreeSlotConsumer();
+
+ final Collection<SlotOffer> slotOffers = Arrays.asList(slotOffer1, slotOffer2);
+
+ slotPoolService.offerSlots(
+ taskManagerLocation,
+ new RpcTaskManagerGateway(
+ new TestingTaskExecutorGatewayBuilder()
+ .setFreeSlotFunction(freeSlotConsumer)
+ .createTestingTaskExecutorGateway(),
+ jobMasterId),
+ slotOffers);
+
+ // slot1 is reserved, slot2 is free.
+ slotPool.reserveFreeSlot(slotOffer1.getAllocationId(), resourceProfile);
+
+ slotPoolService.releaseFreeSlotsOnTaskManager(
+ taskManagerLocation.getResourceID(), new FlinkException("Test cause"));
+
+ assertThat(slotPool.getFreeSlotsInformation()).isEmpty();
+ assertThat(
+ Iterables.getOnlyElement(slotPool.getAllSlotsInformation())
+ .getAllocationId())
+ .isEqualTo(slotOffer1.getAllocationId());
+ assertThat(Iterables.getOnlyElement(freeSlotConsumer.drainFreedSlots()))
+ .isEqualTo(slotOffer2.getAllocationId());
+ }
+ }
+
private DeclarativeSlotPoolService createDeclarativeSlotPoolService() throws Exception {
return createDeclarativeSlotPoolService(new DefaultDeclarativeSlotPoolFactory());
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java
index 295f4059e24..5d10289686b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java
@@ -147,6 +147,12 @@ public class TestingSlotPoolService implements SlotPoolService {
return releaseTaskManagerFunction.apply(taskManagerId, cause);
}
+ @Override
+ public void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause) {
+ throw new UnsupportedOperationException(
+ "TestingSlotPoolService does not support this operation.");
+ }
+
@Override
public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
connectToResourceManagerConsumer.accept(resourceManagerGateway);