You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/29 05:47:17 UTC

[GitHub] [flink] Aitozi opened a new pull request, #19840: [FLINK-24713] Support the initial delay for SlotManager to waiting fo…

Aitozi opened a new pull request, #19840:
URL: https://github.com/apache/flink/pull/19840

   …r taskManager to register
   
   
   ## What is the purpose of the change
   
   This PR is another solution to solve the problem of request the extra worker after JobManager failover. After the offline discussion with @KarmaGYZ . We decide not to introduce the new gateway interface `getRecoveryFuture`. Instead, we extend the current `checkResourceRequirementsWithDelay` to enable it to wait for a longer time before trigger the resource check with a configurable long delay and normal delay option. The same to the DeclarativeSlotManager .
   
   This work is basically based on the @KarmaGYZ 's previous work, thanks for the guidance and suggestion :).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r917544164


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -504,8 +522,11 @@ private void checkResourceRequirementsWithDelay() {
                                             Preconditions.checkNotNull(requirementsCheckFuture)
                                                     .complete(null);
                                         }),
-                        requirementsCheckDelay.toMillis(),
+                        delay,
                         TimeUnit.MILLISECONDS);
+                isRequirementsCheckLongDelay = false;
+            } else {
+                checkResourceRequirements();

Review Comment:
   👌🏻



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #19840:
URL: https://github.com/apache/flink/pull/19840#issuecomment-1180165316

   @xintongsong Thanks for your valuable comments, I also have thought about the trade-off you mentioned. My first PR https://github.com/apache/flink/pull/19786/files to introduce the recoveryFuture may be can save the trade-off and it do not have to touch the slotManager part (After your comments, I think it should be improved not to delay the SlotPool connect, and make it take effect inside the ResourceManager only). Could you also bother to take a look on that ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #19840:
URL: https://github.com/apache/flink/pull/19840#issuecomment-1155047668

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r900711478


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -504,8 +522,11 @@ private void checkResourceRequirementsWithDelay() {
                                             Preconditions.checkNotNull(requirementsCheckFuture)
                                                     .complete(null);
                                         }),
-                        requirementsCheckDelay.toMillis(),
+                        delay,
                         TimeUnit.MILLISECONDS);
+                isRequirementsCheckLongDelay = false;
+            } else {
+                checkResourceRequirements();

Review Comment:
   BTW, I found that the two slot manager have the exactly same method `checkResourceRequirementsWithDelay`. Do we need to add a abstract class to put it to avoid the duplicated methods ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r884246309


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java:
##########
@@ -79,8 +81,19 @@ public class DeclarativeSlotManager implements SlotManager {
     private final Map<JobID, String> jobMasterTargetAddresses = new HashMap<>();
     private final Map<SlotID, AllocationID> pendingSlotAllocations;
 
+    /** Delay of the requirement change check in the slot manager. */
+    private final Duration requirementsCheckDelay;

Review Comment:
   do you mean create another ticker for this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #19840:
URL: https://github.com/apache/flink/pull/19840#issuecomment-1193265774

   Closed in favor of https://github.com/apache/flink/pull/20256


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #19840:
URL: https://github.com/apache/flink/pull/19840#issuecomment-1165496424

   The CI failure seems unrelated, please help take a look again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] KarmaGYZ commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r899686715


##########
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java:
##########
@@ -150,6 +150,18 @@ public class ResourceManagerOptions {
                     .defaultValue(Duration.ofMillis(50))
                     .withDescription("The delay of the resource requirements check.");
 
+    /**
+     * The long delay of requirements check. This is only used for waiting for the previous
+     * TaskManagers registration and will only take effect in the first requirements check.
+     */
+    public static final ConfigOption<Duration> REQUIREMENTS_CHECK_LONG_DELAY =

Review Comment:
   ```suggestion
       @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
       public static final ConfigOption<Duration> REQUIREMENTS_CHECK_LONG_DELAY =
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -491,10 +504,15 @@ public void freeSlot(SlotID slotId, AllocationID allocationId) {
      * are performed with a slight delay.
      */
     private void checkResourceRequirementsWithDelay() {
-        if (requirementsCheckDelay.toMillis() <= 0) {
-            checkResourceRequirements();
-        } else {
-            if (requirementsCheckFuture == null || requirementsCheckFuture.isDone()) {
+        long delay =
+                isRequirementsCheckLongDelay
+                        ? requirementsCheckLongDelay.toMillis()
+                        : requirementsCheckDelay.toMillis();
+        boolean pendingRequirementsCheck =

Review Comment:
   ```suggestion
           boolean hasPendingRequirementsCheck =
   ```



##########
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java:
##########
@@ -150,6 +150,18 @@ public class ResourceManagerOptions {
                     .defaultValue(Duration.ofMillis(50))
                     .withDescription("The delay of the resource requirements check.");
 
+    /**
+     * The long delay of requirements check. This is only used for waiting for the previous
+     * TaskManagers registration and will only take effect in the first requirements check.
+     */
+    public static final ConfigOption<Duration> REQUIREMENTS_CHECK_LONG_DELAY =
+            ConfigOptions.key("slotmanager.requirement-check.long-delay")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(3))

Review Comment:
   I think the default value might be too long. 1s or 500ms might be good enough?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java:
##########
@@ -434,10 +446,15 @@ public void freeSlot(SlotID slotId, AllocationID allocationId) {
      * are performed with a slight delay.
      */
     private void checkResourceRequirementsWithDelay() {

Review Comment:
   Same as the `FineGrainedSlotManager`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -504,8 +522,11 @@ private void checkResourceRequirementsWithDelay() {
                                             Preconditions.checkNotNull(requirementsCheckFuture)
                                                     .complete(null);
                                         }),
-                        requirementsCheckDelay.toMillis(),
+                        delay,
                         TimeUnit.MILLISECONDS);
+                isRequirementsCheckLongDelay = false;
+            } else {
+                checkResourceRequirements();

Review Comment:
   It seems a bit complicated here. We just need to ensure the delay is no less than zero. Then, the `ScheduledExecutor#schedule` will help us to handle it.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java:
##########
@@ -752,4 +753,76 @@ public void testAllocationUpdatesIgnoredIfSlotMarkedAsAllocatedAfterSlotReport()
 
         System.setSecurityManager(null);
     }
+
+    @Test
+    public void testRequirementLongDelayOnlyTakeEffectOnce() throws Exception {
+        final CompletableFuture<Void> allocationIdFuture1 = new CompletableFuture<>();
+        final CompletableFuture<Void> allocationIdFuture2 = new CompletableFuture<>();
+        final TaskExecutorGateway taskExecutorGateway1 =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setRequestSlotFunction(
+                                tuple6 -> {
+                                    allocationIdFuture1.complete(null);
+                                    return CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .createTestingTaskExecutorGateway();
+        final TaskExecutorGateway taskExecutorGateway2 =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setRequestSlotFunction(
+                                tuple6 -> {
+                                    allocationIdFuture2.complete(null);
+                                    return CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .createTestingTaskExecutorGateway();
+        final TaskExecutorConnection taskExecutorConnection1 =
+                new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway1);
+        final TaskExecutorConnection taskExecutorConnection2 =
+                new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway2);
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            final CompletableFuture<Boolean> registerTaskManagerFuture1 =
+                                    new CompletableFuture<>();
+                            final CompletableFuture<Boolean> registerTaskManagerFuture2 =
+                                    new CompletableFuture<>();
+                            runInMainThread(
+                                    () -> {
+                                        getSlotManager().enlargeRequirementsCheckDelayOnce();
+                                        getSlotManager()
+                                                .processResourceRequirements(
+                                                        createResourceRequirements(
+                                                                new JobID(),
+                                                                2,
+                                                                DEFAULT_SLOT_RESOURCE_PROFILE));
+                                        registerTaskManagerFuture1.complete(
+                                                getSlotManager()
+                                                        .registerTaskManager(
+                                                                taskExecutorConnection1,
+                                                                new SlotReport(),
+                                                                DEFAULT_SLOT_RESOURCE_PROFILE,
+                                                                DEFAULT_SLOT_RESOURCE_PROFILE));
+                                    });
+                            assertThat(
+                                    assertFutureCompleteAndReturn(registerTaskManagerFuture1),
+                                    is(true));
+                            assertFutureNotComplete(allocationIdFuture1);
+                            allocationIdFuture1.get(200, TimeUnit.MILLISECONDS);

Review Comment:
   It's a bit fragile. Maybe we can enlarge the `requirementCheckLongDelay` to 400ms.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java:
##########
@@ -752,4 +753,76 @@ public void testAllocationUpdatesIgnoredIfSlotMarkedAsAllocatedAfterSlotReport()
 
         System.setSecurityManager(null);
     }
+
+    @Test
+    public void testRequirementLongDelayOnlyTakeEffectOnce() throws Exception {
+        final CompletableFuture<Void> allocationIdFuture1 = new CompletableFuture<>();
+        final CompletableFuture<Void> allocationIdFuture2 = new CompletableFuture<>();
+        final TaskExecutorGateway taskExecutorGateway1 =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setRequestSlotFunction(
+                                tuple6 -> {
+                                    allocationIdFuture1.complete(null);
+                                    return CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .createTestingTaskExecutorGateway();
+        final TaskExecutorGateway taskExecutorGateway2 =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setRequestSlotFunction(
+                                tuple6 -> {
+                                    allocationIdFuture2.complete(null);
+                                    return CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .createTestingTaskExecutorGateway();
+        final TaskExecutorConnection taskExecutorConnection1 =
+                new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway1);
+        final TaskExecutorConnection taskExecutorConnection2 =
+                new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway2);
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            final CompletableFuture<Boolean> registerTaskManagerFuture1 =
+                                    new CompletableFuture<>();
+                            final CompletableFuture<Boolean> registerTaskManagerFuture2 =
+                                    new CompletableFuture<>();
+                            runInMainThread(
+                                    () -> {
+                                        getSlotManager().enlargeRequirementsCheckDelayOnce();
+                                        getSlotManager()
+                                                .processResourceRequirements(
+                                                        createResourceRequirements(
+                                                                new JobID(),
+                                                                2,
+                                                                DEFAULT_SLOT_RESOURCE_PROFILE));
+                                        registerTaskManagerFuture1.complete(
+                                                getSlotManager()
+                                                        .registerTaskManager(
+                                                                taskExecutorConnection1,
+                                                                new SlotReport(),
+                                                                DEFAULT_SLOT_RESOURCE_PROFILE,
+                                                                DEFAULT_SLOT_RESOURCE_PROFILE));
+                                    });
+                            assertThat(
+                                    assertFutureCompleteAndReturn(registerTaskManagerFuture1),
+                                    is(true));
+                            assertFutureNotComplete(allocationIdFuture1);
+                            allocationIdFuture1.get(200, TimeUnit.MILLISECONDS);

Review Comment:
   And here, we just use `assertFutureCompleteAndReturn`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r901209861


##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java:
##########
@@ -1466,6 +1466,52 @@ public void testProcessResourceRequirementsWithDelay() throws Exception {
         }
     }
 
+    @Test
+    public void testProcessResourceRequirementsWithLongDelay() throws Exception {
+        final ResourceTracker resourceTracker = new DefaultResourceTracker();
+        final AtomicInteger allocatedResourceCounter = new AtomicInteger(0);
+        final ManuallyTriggeredScheduledExecutor scheduledExecutor =
+                new ManuallyTriggeredScheduledExecutor();
+        final Duration longDelay = Duration.ofSeconds(1);
+        final Duration delay = Duration.ofMillis(500);
+        try (final DeclarativeSlotManager slotManager =
+                createDeclarativeSlotManagerBuilder(scheduledExecutor)
+                        .setResourceTracker(resourceTracker)
+                        .setRequirementCheckDelay(delay)
+                        .setRequirementCheckLongDelay(longDelay)
+                        .buildAndStartWithDirectExec(
+                                ResourceManagerId.generate(),
+                                new TestingResourceActionsBuilder()
+                                        .setAllocateResourceConsumer(
+                                                workerResourceSpec ->
+                                                        allocatedResourceCounter.getAndIncrement())
+                                        .build())) {
+
+            final JobID jobId = new JobID();
+
+            slotManager.enlargeRequirementsCheckDelayOnce();
+
+            slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
+            Assertions.assertEquals(0, allocatedResourceCounter.get());

Review Comment:
   thanks, fixed now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r901274198


##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java:
##########
@@ -1449,20 +1449,70 @@ public void testProcessResourceRequirementsWithDelay() throws Exception {
             final JobID jobId = new JobID();
 
             slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
-            Assertions.assertEquals(0, allocatedResourceCounter.get());
-            Assertions.assertEquals(
-                    1, scheduledExecutor.getActiveNonPeriodicScheduledTask().size());
+            Assertions.assertThat(allocatedResourceCounter.get()).isEqualTo(0);
+            Assertions.assertThat(scheduledExecutor.getActiveNonPeriodicScheduledTask().size())
+                    .isEqualTo(1);
             final ScheduledFuture<?> future =
                     scheduledExecutor.getActiveNonPeriodicScheduledTask().iterator().next();
-            Assertions.assertEquals(delay.toMillis(), future.getDelay(TimeUnit.MILLISECONDS));
+            Assertions.assertThat(future.getDelay(TimeUnit.MILLISECONDS))
+                    .isEqualTo(delay.toMillis());
 
             // the second request is skipped
             slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
-            Assertions.assertEquals(
-                    1, scheduledExecutor.getActiveNonPeriodicScheduledTask().size());
+            Assertions.assertThat(scheduledExecutor.getActiveNonPeriodicScheduledTask().size())
+                    .isEqualTo(1);
 
             scheduledExecutor.triggerNonPeriodicScheduledTask();
-            Assertions.assertEquals(1, allocatedResourceCounter.get());
+            Assertions.assertThat(allocatedResourceCounter.get()).isEqualTo(1);
+        }
+    }
+
+    @Test
+    public void testProcessResourceRequirementsWithLongDelay() throws Exception {
+        final ResourceTracker resourceTracker = new DefaultResourceTracker();
+        final AtomicInteger allocatedResourceCounter = new AtomicInteger(0);
+        final ManuallyTriggeredScheduledExecutor scheduledExecutor =
+                new ManuallyTriggeredScheduledExecutor();
+        final Duration longDelay = Duration.ofSeconds(1);
+        final Duration delay = Duration.ofMillis(500);
+        try (final DeclarativeSlotManager slotManager =
+                createDeclarativeSlotManagerBuilder(scheduledExecutor)
+                        .setResourceTracker(resourceTracker)
+                        .setRequirementCheckDelay(delay)
+                        .setRequirementCheckLongDelay(longDelay)
+                        .buildAndStartWithDirectExec(
+                                ResourceManagerId.generate(),
+                                new TestingResourceActionsBuilder()
+                                        .setAllocateResourceConsumer(
+                                                workerResourceSpec ->
+                                                        allocatedResourceCounter.getAndIncrement())
+                                        .build())) {
+
+            final JobID jobId = new JobID();
+
+            slotManager.enlargeRequirementsCheckDelayOnce();
+
+            slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
+
+            Assertions.assertThat(allocatedResourceCounter.get()).isEqualTo(0);
+            Assertions.assertThat(scheduledExecutor.getActiveNonPeriodicScheduledTask().size())

Review Comment:
   ```suggestion
              Assertions.assertThat(scheduledExecutor.getActiveNonPeriodicScheduledTask()).hasSize(1);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r884374610


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java:
##########
@@ -79,8 +81,19 @@ public class DeclarativeSlotManager implements SlotManager {
     private final Map<JobID, String> jobMasterTargetAddresses = new HashMap<>();
     private final Map<SlotID, AllocationID> pendingSlotAllocations;
 
+    /** Delay of the requirement change check in the slot manager. */
+    private final Duration requirementsCheckDelay;

Review Comment:
   Get it, will do it 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java:
##########
@@ -79,8 +81,19 @@ public class DeclarativeSlotManager implements SlotManager {
     private final Map<JobID, String> jobMasterTargetAddresses = new HashMap<>();
     private final Map<SlotID, AllocationID> pendingSlotAllocations;
 
+    /** Delay of the requirement change check in the slot manager. */
+    private final Duration requirementsCheckDelay;

Review Comment:
   do you mean create another ticket for this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] KarmaGYZ commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r917525528


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -504,8 +522,11 @@ private void checkResourceRequirementsWithDelay() {
                                             Preconditions.checkNotNull(requirementsCheckFuture)
                                                     .complete(null);
                                         }),
-                        requirementsCheckDelay.toMillis(),
+                        delay,
                         TimeUnit.MILLISECONDS);
+                isRequirementsCheckLongDelay = false;
+            } else {
+                checkResourceRequirements();

Review Comment:
   I think it is a bit tricky and will harm the test stability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #19840:
URL: https://github.com/apache/flink/pull/19840#issuecomment-1179638147

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r901126216


##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java:
##########
@@ -1466,6 +1466,52 @@ public void testProcessResourceRequirementsWithDelay() throws Exception {
         }
     }
 
+    @Test
+    public void testProcessResourceRequirementsWithLongDelay() throws Exception {
+        final ResourceTracker resourceTracker = new DefaultResourceTracker();
+        final AtomicInteger allocatedResourceCounter = new AtomicInteger(0);
+        final ManuallyTriggeredScheduledExecutor scheduledExecutor =
+                new ManuallyTriggeredScheduledExecutor();
+        final Duration longDelay = Duration.ofSeconds(1);
+        final Duration delay = Duration.ofMillis(500);
+        try (final DeclarativeSlotManager slotManager =
+                createDeclarativeSlotManagerBuilder(scheduledExecutor)
+                        .setResourceTracker(resourceTracker)
+                        .setRequirementCheckDelay(delay)
+                        .setRequirementCheckLongDelay(longDelay)
+                        .buildAndStartWithDirectExec(
+                                ResourceManagerId.generate(),
+                                new TestingResourceActionsBuilder()
+                                        .setAllocateResourceConsumer(
+                                                workerResourceSpec ->
+                                                        allocatedResourceCounter.getAndIncrement())
+                                        .build())) {
+
+            final JobID jobId = new JobID();
+
+            slotManager.enlargeRequirementsCheckDelayOnce();
+
+            slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
+            Assertions.assertEquals(0, allocatedResourceCounter.get());

Review Comment:
   You should use assertThat().isEqualTo() of assertj here 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r917512143


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -504,8 +522,11 @@ private void checkResourceRequirementsWithDelay() {
                                             Preconditions.checkNotNull(requirementsCheckFuture)
                                                     .complete(null);
                                         }),
-                        requirementsCheckDelay.toMillis(),
+                        delay,
                         TimeUnit.MILLISECONDS);
+                isRequirementsCheckLongDelay = false;
+            } else {
+                checkResourceRequirements();

Review Comment:
   I mean most test around `DeclarativeSlotManager` test it as synchronously. It will be easy to let it be test with the old behavior. But I can replace it with a slight timeout wait. I will push a commit for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] KarmaGYZ commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r917515028


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -504,8 +522,11 @@ private void checkResourceRequirementsWithDelay() {
                                             Preconditions.checkNotNull(requirementsCheckFuture)
                                                     .complete(null);
                                         }),
-                        requirementsCheckDelay.toMillis(),
+                        delay,
                         TimeUnit.MILLISECONDS);
+                isRequirementsCheckLongDelay = false;
+            } else {
+                checkResourceRequirements();

Review Comment:
   What do you mean by "replace it with a slight timeout wait"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r890203960


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java:
##########
@@ -79,8 +81,19 @@ public class DeclarativeSlotManager implements SlotManager {
     private final Map<JobID, String> jobMasterTargetAddresses = new HashMap<>();
     private final Map<SlotID, AllocationID> pendingSlotAllocations;
 
+    /** Delay of the requirement change check in the slot manager. */
+    private final Duration requirementsCheckDelay;

Review Comment:
   A preceding PR https://github.com/apache/flink/pull/19886 is opened. Please take a look, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #19840:
URL: https://github.com/apache/flink/pull/19840#issuecomment-1154742585

   Hi @KarmaGYZ , I have rebased this PR based on the previous commit, please take a look again, thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r900707375


##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java:
##########
@@ -752,4 +753,76 @@ public void testAllocationUpdatesIgnoredIfSlotMarkedAsAllocatedAfterSlotReport()
 
         System.setSecurityManager(null);
     }
+
+    @Test
+    public void testRequirementLongDelayOnlyTakeEffectOnce() throws Exception {
+        final CompletableFuture<Void> allocationIdFuture1 = new CompletableFuture<>();
+        final CompletableFuture<Void> allocationIdFuture2 = new CompletableFuture<>();
+        final TaskExecutorGateway taskExecutorGateway1 =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setRequestSlotFunction(
+                                tuple6 -> {
+                                    allocationIdFuture1.complete(null);
+                                    return CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .createTestingTaskExecutorGateway();
+        final TaskExecutorGateway taskExecutorGateway2 =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setRequestSlotFunction(
+                                tuple6 -> {
+                                    allocationIdFuture2.complete(null);
+                                    return CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .createTestingTaskExecutorGateway();
+        final TaskExecutorConnection taskExecutorConnection1 =
+                new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway1);
+        final TaskExecutorConnection taskExecutorConnection2 =
+                new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway2);
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            final CompletableFuture<Boolean> registerTaskManagerFuture1 =
+                                    new CompletableFuture<>();
+                            final CompletableFuture<Boolean> registerTaskManagerFuture2 =
+                                    new CompletableFuture<>();
+                            runInMainThread(
+                                    () -> {
+                                        getSlotManager().enlargeRequirementsCheckDelayOnce();
+                                        getSlotManager()
+                                                .processResourceRequirements(
+                                                        createResourceRequirements(
+                                                                new JobID(),
+                                                                2,
+                                                                DEFAULT_SLOT_RESOURCE_PROFILE));
+                                        registerTaskManagerFuture1.complete(
+                                                getSlotManager()
+                                                        .registerTaskManager(
+                                                                taskExecutorConnection1,
+                                                                new SlotReport(),
+                                                                DEFAULT_SLOT_RESOURCE_PROFILE,
+                                                                DEFAULT_SLOT_RESOURCE_PROFILE));
+                                    });
+                            assertThat(
+                                    assertFutureCompleteAndReturn(registerTaskManagerFuture1),
+                                    is(true));
+                            assertFutureNotComplete(allocationIdFuture1);
+                            allocationIdFuture1.get(200, TimeUnit.MILLISECONDS);

Review Comment:
   I simplify this test case and use the assertj for it, please take a look again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r899809302


##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java:
##########
@@ -752,4 +753,76 @@ public void testAllocationUpdatesIgnoredIfSlotMarkedAsAllocatedAfterSlotReport()
 
         System.setSecurityManager(null);
     }
+
+    @Test
+    public void testRequirementLongDelayOnlyTakeEffectOnce() throws Exception {
+        final CompletableFuture<Void> allocationIdFuture1 = new CompletableFuture<>();
+        final CompletableFuture<Void> allocationIdFuture2 = new CompletableFuture<>();
+        final TaskExecutorGateway taskExecutorGateway1 =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setRequestSlotFunction(
+                                tuple6 -> {
+                                    allocationIdFuture1.complete(null);
+                                    return CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .createTestingTaskExecutorGateway();
+        final TaskExecutorGateway taskExecutorGateway2 =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setRequestSlotFunction(
+                                tuple6 -> {
+                                    allocationIdFuture2.complete(null);
+                                    return CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .createTestingTaskExecutorGateway();
+        final TaskExecutorConnection taskExecutorConnection1 =
+                new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway1);
+        final TaskExecutorConnection taskExecutorConnection2 =
+                new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway2);
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            final CompletableFuture<Boolean> registerTaskManagerFuture1 =
+                                    new CompletableFuture<>();
+                            final CompletableFuture<Boolean> registerTaskManagerFuture2 =
+                                    new CompletableFuture<>();
+                            runInMainThread(
+                                    () -> {
+                                        getSlotManager().enlargeRequirementsCheckDelayOnce();
+                                        getSlotManager()
+                                                .processResourceRequirements(
+                                                        createResourceRequirements(
+                                                                new JobID(),
+                                                                2,
+                                                                DEFAULT_SLOT_RESOURCE_PROFILE));
+                                        registerTaskManagerFuture1.complete(
+                                                getSlotManager()
+                                                        .registerTaskManager(
+                                                                taskExecutorConnection1,
+                                                                new SlotReport(),
+                                                                DEFAULT_SLOT_RESOURCE_PROFILE,
+                                                                DEFAULT_SLOT_RESOURCE_PROFILE));
+                                    });
+                            assertThat(
+                                    assertFutureCompleteAndReturn(registerTaskManagerFuture1),
+                                    is(true));
+                            assertFutureNotComplete(allocationIdFuture1);
+                            allocationIdFuture1.get(200, TimeUnit.MILLISECONDS);

Review Comment:
   Just a small suggestion: for newly introduced tests we should use assertj then the code will look like this
   ```Assertions.assertThat(allocationIdFuture1).succeedsWithin(FUTURE_TIMEOUT_SECOND, TimeUnit.SECONDS)```
   Of course, it would be better if the entire test class could be migrated to junit5 by the way. The entire runtime is a very large module. Migrating to junit5 & assertj requires a lot of work. If the new test does not do this, it will be more difficult to migrate later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #19840:
URL: https://github.com/apache/flink/pull/19840#issuecomment-1180203351

   thanks, I will try to improve it as your suggestion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] KarmaGYZ commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r917500734


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -504,8 +522,11 @@ private void checkResourceRequirementsWithDelay() {
                                             Preconditions.checkNotNull(requirementsCheckFuture)
                                                     .complete(null);
                                         }),
-                        requirementsCheckDelay.toMillis(),
+                        delay,
                         TimeUnit.MILLISECONDS);
+                isRequirementsCheckLongDelay = false;
+            } else {
+                checkResourceRequirements();

Review Comment:
   > I have tried this way, but I found it will change this method's behavior to async when delay is 0, now the behavior is sync. So I keep the current implementation and most test do not have to be touched.
   
   I don't think this method by design is guaranteed to be triggered synchronously when the delay is zero.
   
   It's always good to do deduplication work. But I think the `DeclarativeSlotManager` will be entirely replaced by the `FineGrainedSlotManager` in the near future. So, we do not need to do it atm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #19840:
URL: https://github.com/apache/flink/pull/19840#issuecomment-1180179352

   @Aitozi I think the previous PR needs to be improved, but the general idea of having a `recoveryFuture` aligns with my suggestions.
   
   This future should be completed when:
   - Whenever the recovered workers become empty
   - The timeout is reached
   - For standalone resource manager, at the very beginning
   
   And all the `processResourceRequirements` (or `declareRequiredResources`) should be wrapped with `whenComplete` on this future.
   
   The implementation should be simplified with awareness of:
   1. If a completable future is completed multiple times, only the first time matters.
   2. Calling `whenComplete` on a future that is already completed will trigger the execution immediately.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] KarmaGYZ commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r917525860


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -504,8 +522,11 @@ private void checkResourceRequirementsWithDelay() {
                                             Preconditions.checkNotNull(requirementsCheckFuture)
                                                     .complete(null);
                                         }),
-                        requirementsCheckDelay.toMillis(),
+                        delay,
                         TimeUnit.MILLISECONDS);
+                isRequirementsCheckLongDelay = false;
+            } else {
+                checkResourceRequirements();

Review Comment:
   I'm lean to keep the current implementation and simply that logic after the retirement of the `DeclarativeSlotManager `.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #19840:
URL: https://github.com/apache/flink/pull/19840#issuecomment-1179638069

   The failure task seems caused by the Kafka connector test time out: https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/37968/logs/195 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #19840:
URL: https://github.com/apache/flink/pull/19840#issuecomment-1179873752

   Hi @KarmaGYZ, Please help take a look again, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r917521227


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -504,8 +522,11 @@ private void checkResourceRequirementsWithDelay() {
                                             Preconditions.checkNotNull(requirementsCheckFuture)
                                                     .complete(null);
                                         }),
-                        requirementsCheckDelay.toMillis(),
+                        delay,
                         TimeUnit.MILLISECONDS);
+                isRequirementsCheckLongDelay = false;
+            } else {
+                checkResourceRequirements();

Review Comment:
   I mean some test reply on the synchronously behavior of the DeclarativeSlotManager#checkResourceRequirements. So it will check the result in a synchronously way. "replace it with a slight timeout wait" means to replace these `Assert` with a slight timeout wait.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r900706727


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -504,8 +522,11 @@ private void checkResourceRequirementsWithDelay() {
                                             Preconditions.checkNotNull(requirementsCheckFuture)
                                                     .complete(null);
                                         }),
-                        requirementsCheckDelay.toMillis(),
+                        delay,
                         TimeUnit.MILLISECONDS);
+                isRequirementsCheckLongDelay = false;
+            } else {
+                checkResourceRequirements();

Review Comment:
   I have tried this way, but I found it will change this method's behavior to async when delay is 0, now the behavior is sync. So I keep the current implementation and most test do not have to be touched.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] KarmaGYZ commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r884242067


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java:
##########
@@ -79,8 +81,19 @@ public class DeclarativeSlotManager implements SlotManager {
     private final Map<JobID, String> jobMasterTargetAddresses = new HashMap<>();
     private final Map<SlotID, AllocationID> pendingSlotAllocations;
 
+    /** Delay of the requirement change check in the slot manager. */
+    private final Duration requirementsCheckDelay;

Review Comment:
   I tend to move this to a preceding issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] KarmaGYZ commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r884357127


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java:
##########
@@ -79,8 +81,19 @@ public class DeclarativeSlotManager implements SlotManager {
     private final Map<JobID, String> jobMasterTargetAddresses = new HashMap<>();
     private final Map<SlotID, AllocationID> pendingSlotAllocations;
 
+    /** Delay of the requirement change check in the slot manager. */
+    private final Duration requirementsCheckDelay;

Review Comment:
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19840: [FLINK-24713] Support the initial delay for SlotManager to waiting fo…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19840:
URL: https://github.com/apache/flink/pull/19840#issuecomment-1140382750

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "38bf7a8b283a7f033f7e0ca38cd8ca032fe496ca",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "38bf7a8b283a7f033f7e0ca38cd8ca032fe496ca",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 38bf7a8b283a7f033f7e0ca38cd8ca032fe496ca UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi closed pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi closed pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…
URL: https://github.com/apache/flink/pull/19840


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r899809302


##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java:
##########
@@ -752,4 +753,76 @@ public void testAllocationUpdatesIgnoredIfSlotMarkedAsAllocatedAfterSlotReport()
 
         System.setSecurityManager(null);
     }
+
+    @Test
+    public void testRequirementLongDelayOnlyTakeEffectOnce() throws Exception {
+        final CompletableFuture<Void> allocationIdFuture1 = new CompletableFuture<>();
+        final CompletableFuture<Void> allocationIdFuture2 = new CompletableFuture<>();
+        final TaskExecutorGateway taskExecutorGateway1 =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setRequestSlotFunction(
+                                tuple6 -> {
+                                    allocationIdFuture1.complete(null);
+                                    return CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .createTestingTaskExecutorGateway();
+        final TaskExecutorGateway taskExecutorGateway2 =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setRequestSlotFunction(
+                                tuple6 -> {
+                                    allocationIdFuture2.complete(null);
+                                    return CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .createTestingTaskExecutorGateway();
+        final TaskExecutorConnection taskExecutorConnection1 =
+                new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway1);
+        final TaskExecutorConnection taskExecutorConnection2 =
+                new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway2);
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            final CompletableFuture<Boolean> registerTaskManagerFuture1 =
+                                    new CompletableFuture<>();
+                            final CompletableFuture<Boolean> registerTaskManagerFuture2 =
+                                    new CompletableFuture<>();
+                            runInMainThread(
+                                    () -> {
+                                        getSlotManager().enlargeRequirementsCheckDelayOnce();
+                                        getSlotManager()
+                                                .processResourceRequirements(
+                                                        createResourceRequirements(
+                                                                new JobID(),
+                                                                2,
+                                                                DEFAULT_SLOT_RESOURCE_PROFILE));
+                                        registerTaskManagerFuture1.complete(
+                                                getSlotManager()
+                                                        .registerTaskManager(
+                                                                taskExecutorConnection1,
+                                                                new SlotReport(),
+                                                                DEFAULT_SLOT_RESOURCE_PROFILE,
+                                                                DEFAULT_SLOT_RESOURCE_PROFILE));
+                                    });
+                            assertThat(
+                                    assertFutureCompleteAndReturn(registerTaskManagerFuture1),
+                                    is(true));
+                            assertFutureNotComplete(allocationIdFuture1);
+                            allocationIdFuture1.get(200, TimeUnit.MILLISECONDS);

Review Comment:
   Just a small suggestion: for newly introduced tests we should use assertj then the code will look like this
   ```Assertions.assertThat(allocationIdFuture1).succeedsWithin(FUTURE_TIMEOUT_SECOND, TimeUnit.SECONDS)```
   Of course, it would be better if the entire test class could be migrated to junit5 by the way. The entire runtime is a very large module, migrating to junit5 & assertj requires a lot of work. If the new test does not do this, it will be more difficult to migrate later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r900113489


##########
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java:
##########
@@ -150,6 +150,18 @@ public class ResourceManagerOptions {
                     .defaultValue(Duration.ofMillis(50))
                     .withDescription("The delay of the resource requirements check.");
 
+    /**
+     * The long delay of requirements check. This is only used for waiting for the previous
+     * TaskManagers registration and will only take effect in the first requirements check.
+     */
+    public static final ConfigOption<Duration> REQUIREMENTS_CHECK_LONG_DELAY =
+            ConfigOptions.key("slotmanager.requirement-check.long-delay")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(3))

Review Comment:
   Yes, I think we could add a conservative default value 1s, and add description to let user know: if redundant TaskManager are launched during the JobManager failover, we could increase this delay to solve it. 
   How do you think ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #19840:
URL: https://github.com/apache/flink/pull/19840#issuecomment-1157161191

   The failed test with unfinished test case `org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase.runCheckpointedProgram` can finish locally 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #19840:
URL: https://github.com/apache/flink/pull/19840#issuecomment-1157161267

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #19840:
URL: https://github.com/apache/flink/pull/19840#discussion_r904906154


##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java:
##########
@@ -1449,20 +1449,70 @@ public void testProcessResourceRequirementsWithDelay() throws Exception {
             final JobID jobId = new JobID();
 
             slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
-            Assertions.assertEquals(0, allocatedResourceCounter.get());
-            Assertions.assertEquals(
-                    1, scheduledExecutor.getActiveNonPeriodicScheduledTask().size());
+            Assertions.assertThat(allocatedResourceCounter.get()).isEqualTo(0);
+            Assertions.assertThat(scheduledExecutor.getActiveNonPeriodicScheduledTask().size())
+                    .isEqualTo(1);
             final ScheduledFuture<?> future =
                     scheduledExecutor.getActiveNonPeriodicScheduledTask().iterator().next();
-            Assertions.assertEquals(delay.toMillis(), future.getDelay(TimeUnit.MILLISECONDS));
+            Assertions.assertThat(future.getDelay(TimeUnit.MILLISECONDS))
+                    .isEqualTo(delay.toMillis());
 
             // the second request is skipped
             slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
-            Assertions.assertEquals(
-                    1, scheduledExecutor.getActiveNonPeriodicScheduledTask().size());
+            Assertions.assertThat(scheduledExecutor.getActiveNonPeriodicScheduledTask().size())
+                    .isEqualTo(1);
 
             scheduledExecutor.triggerNonPeriodicScheduledTask();
-            Assertions.assertEquals(1, allocatedResourceCounter.get());
+            Assertions.assertThat(allocatedResourceCounter.get()).isEqualTo(1);
+        }
+    }
+
+    @Test
+    public void testProcessResourceRequirementsWithLongDelay() throws Exception {
+        final ResourceTracker resourceTracker = new DefaultResourceTracker();
+        final AtomicInteger allocatedResourceCounter = new AtomicInteger(0);
+        final ManuallyTriggeredScheduledExecutor scheduledExecutor =
+                new ManuallyTriggeredScheduledExecutor();
+        final Duration longDelay = Duration.ofSeconds(1);
+        final Duration delay = Duration.ofMillis(500);
+        try (final DeclarativeSlotManager slotManager =
+                createDeclarativeSlotManagerBuilder(scheduledExecutor)
+                        .setResourceTracker(resourceTracker)
+                        .setRequirementCheckDelay(delay)
+                        .setRequirementCheckLongDelay(longDelay)
+                        .buildAndStartWithDirectExec(
+                                ResourceManagerId.generate(),
+                                new TestingResourceActionsBuilder()
+                                        .setAllocateResourceConsumer(
+                                                workerResourceSpec ->
+                                                        allocatedResourceCounter.getAndIncrement())
+                                        .build())) {
+
+            final JobID jobId = new JobID();
+
+            slotManager.enlargeRequirementsCheckDelayOnce();
+
+            slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
+
+            Assertions.assertThat(allocatedResourceCounter.get()).isEqualTo(0);
+            Assertions.assertThat(scheduledExecutor.getActiveNonPeriodicScheduledTask().size())

Review Comment:
   thanks, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org