You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/12 17:00:52 UTC

[GitHub] [flink] Aitozi opened a new pull request, #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Aitozi opened a new pull request, #20256:
URL: https://github.com/apache/flink/pull/20256

   …after the recovery phase has finished
   
   ## What is the purpose of the change
   
   This PR is meant to solve the problem of request the extra worker after JobManager failover. Related issue:
   
   https://issues.apache.org/jira/browse/FLINK-24713
   https://issues.apache.org/jira/browse/FLINK-27576
   
   Based on https://github.com/apache/flink/pull/19786
   
   ## Brief change log
   
     -  Add the getRecoveryFuture interface in `resourceManager` to let the subclass to tell whether it is ready to serve.
     -  Postpone the process resource until the recovery future is ready.
   
   ## Verifying this change
   
   Two tests are added to verify the change.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #20256:
URL: https://github.com/apache/flink/pull/20256#issuecomment-1182719574

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #20256:
URL: https://github.com/apache/flink/pull/20256#discussion_r921840763


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -418,6 +443,30 @@ private void tryResetWorkerCreationCoolDown() {
         }
     }
 
+    @Override
+    public CompletableFuture<Acknowledge> getRecoveryFuture() {
+        return recoveryFuture;
+    }
+
+    private void tryRemovePreviousPendingRecoveryTaskManager(ResourceID resourceID, String reason) {
+        long sizeBeforeRemove = previousAttemptUnregisteredWorkers.size();
+        boolean exist = previousAttemptUnregisteredWorkers.remove(resourceID);
+        if (exist) {
+            if (!recoveryFuture.isDone()) {

Review Comment:
   `exist` indicates the count have changed, if count not change, we do not have to do check the size change condition. I move the `!recoveryFuture.isDone()` to the inner if to make the log more suitable. The resource manager will be ready to serve on the timeout or all the worker recovered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #20256:
URL: https://github.com/apache/flink/pull/20256#discussion_r921839278


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -326,6 +350,7 @@ private void scheduleWorkerRegistrationTimeoutCheck(final ResourceID resourceId)
                                 "Worker {} did not register in {}, will stop it and request a new one if needed.",
                                 resourceId,
                                 workerRegistrationTimeout);
+                        tryRemovePreviousPendingRecoveryTaskManager(resourceId, "Register timeout");

Review Comment:
   Yes, but here have the accurate reason for "Register timeout". I remove the `reason` in log for simple.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #20256:
URL: https://github.com/apache/flink/pull/20256#discussion_r921831385


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -244,6 +254,20 @@ public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
                     "Worker {} recovered from previous attempt.",
                     resourceId.getStringWithMetadata());
         }
+        if (recoveredWorkers.size() > 0) {
+            scheduleRunAsync(
+                    () -> {
+                        if (!recoveryFuture.isDone()) {

Review Comment:
   If when timeout reached, and the future is completed by register success, there is no need to log the timeout 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #20256:
URL: https://github.com/apache/flink/pull/20256#discussion_r922988717


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -244,6 +254,20 @@ public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
                     "Worker {} recovered from previous attempt.",
                     resourceId.getStringWithMetadata());
         }
+        if (recoveredWorkers.size() > 0) {

Review Comment:
   There's no need to go through the `scheduleRunAsync` when timeout is zero.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #20256:
URL: https://github.com/apache/flink/pull/20256#discussion_r922993786


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -244,6 +254,20 @@ public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
                     "Worker {} recovered from previous attempt.",
                     resourceId.getStringWithMetadata());
         }
+        if (recoveredWorkers.size() > 0) {
+            scheduleRunAsync(
+                    () -> {
+                        if (!recoveryFuture.isDone()) {

Review Comment:
   Same here for the log vs. complexity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #20256:
URL: https://github.com/apache/flink/pull/20256#discussion_r922993714


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -418,6 +443,30 @@ private void tryResetWorkerCreationCoolDown() {
         }
     }
 
+    @Override
+    public CompletableFuture<Acknowledge> getRecoveryFuture() {
+        return recoveryFuture;
+    }
+
+    private void tryRemovePreviousPendingRecoveryTaskManager(ResourceID resourceID, String reason) {
+        long sizeBeforeRemove = previousAttemptUnregisteredWorkers.size();
+        boolean exist = previousAttemptUnregisteredWorkers.remove(resourceID);
+        if (exist) {
+            if (!recoveryFuture.isDone()) {

Review Comment:
   I'm not sure the log accuracy is that important for this feature, comparing to the complexity it adds with all the if/else branches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #20256:
URL: https://github.com/apache/flink/pull/20256#discussion_r928204382


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -244,6 +254,20 @@ public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
                     "Worker {} recovered from previous attempt.",
                     resourceId.getStringWithMetadata());
         }
+        if (recoveredWorkers.size() > 0) {

Review Comment:
   Make sense, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #20256:
URL: https://github.com/apache/flink/pull/20256#issuecomment-1184028495

   @xintongsong please take a look on this PR. It's a new PR based on your suggestion in https://github.com/apache/flink/pull/19840. Also cc @KarmaGYZ
   Ps: I don't know why the flinkbot have not run for the latest commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #20256:
URL: https://github.com/apache/flink/pull/20256#discussion_r921925686


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -244,6 +254,20 @@ public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
                     "Worker {} recovered from previous attempt.",
                     resourceId.getStringWithMetadata());
         }
+        if (recoveredWorkers.size() > 0) {

Review Comment:
   I think we still can do the complete when `previousWorkerRecoverTimeout.isZero` here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #20256:
URL: https://github.com/apache/flink/pull/20256#issuecomment-1185325931

   Addressed your comments, please take a look again @xintongsong 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #20256:
URL: https://github.com/apache/flink/pull/20256#discussion_r921841180


##########
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java:
##########
@@ -261,6 +261,14 @@ public class ResourceManagerOptions {
                                     + TaskManagerOptions.REGISTRATION_TIMEOUT.key()
                                     + "'.");
 
+    /** Timeout for ResourceManager to recover all the previous attempts workers. */
+    public static final ConfigOption<Duration> RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT =
+            ConfigOptions.key("resourcemanager.previous-worker.recovery.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(5))

Review Comment:
   IMO, although 0 is the previous behavior, but it's not a good choice in production environment. But, I'm ok let the default value to be conservative to keep same as before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #20256:
URL: https://github.com/apache/flink/pull/20256#discussion_r920959736


##########
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java:
##########
@@ -261,6 +261,14 @@ public class ResourceManagerOptions {
                                     + TaskManagerOptions.REGISTRATION_TIMEOUT.key()
                                     + "'.");
 
+    /** Timeout for ResourceManager to recover all the previous attempts workers. */
+    public static final ConfigOption<Duration> RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT =
+            ConfigOptions.key("resourcemanager.previous-worker.recovery.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(5))
+                    .withDescription(
+                            "Timeout for resource manager to recover all the previous attempts workers.");

Review Comment:
   I think the description can be improved to make it more helpful for common users. E.g., what happens when the timeout is reached, how should the users tune this config.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -102,6 +103,12 @@
      */
     private CompletableFuture<Void> startWorkerCoolDown;
 
+    /** The future indicates whether the rm is ready to serve. */
+    private final CompletableFuture<Acknowledge> recoveryFuture;

Review Comment:
   ```suggestion
       private final CompletableFuture<Void> recoveryFuture;
   ```



##########
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java:
##########
@@ -261,6 +261,14 @@ public class ResourceManagerOptions {
                                     + TaskManagerOptions.REGISTRATION_TIMEOUT.key()
                                     + "'.");
 
+    /** Timeout for ResourceManager to recover all the previous attempts workers. */
+    public static final ConfigOption<Duration> RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT =
+            ConfigOptions.key("resourcemanager.previous-worker.recovery.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(5))

Review Comment:
   I think the default value should be `0`, to align with the previous behaviors.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -326,6 +350,7 @@ private void scheduleWorkerRegistrationTimeoutCheck(final ResourceID resourceId)
                                 "Worker {} did not register in {}, will stop it and request a new one if needed.",
                                 resourceId,
                                 workerRegistrationTimeout);
+                        tryRemovePreviousPendingRecoveryTaskManager(resourceId, "Register timeout");

Review Comment:
   We should not need this, because it is covered by `internalStopWorker`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -244,6 +254,20 @@ public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
                     "Worker {} recovered from previous attempt.",
                     resourceId.getStringWithMetadata());
         }
+        if (recoveredWorkers.size() > 0) {
+            scheduleRunAsync(
+                    () -> {
+                        if (!recoveryFuture.isDone()) {

Review Comment:
   No need to check for `recoveryFuture.isDone()`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -418,6 +443,30 @@ private void tryResetWorkerCreationCoolDown() {
         }
     }
 
+    @Override
+    public CompletableFuture<Acknowledge> getRecoveryFuture() {
+        return recoveryFuture;
+    }
+
+    private void tryRemovePreviousPendingRecoveryTaskManager(ResourceID resourceID, String reason) {
+        long sizeBeforeRemove = previousAttemptUnregisteredWorkers.size();
+        boolean exist = previousAttemptUnregisteredWorkers.remove(resourceID);
+        if (exist) {
+            if (!recoveryFuture.isDone()) {

Review Comment:
   No need to check for `exist` and `recoveryFuture.isDone()`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -531,9 +531,12 @@ public CompletableFuture<Acknowledge> declareRequiredResources(
 
         if (null != jobManagerRegistration) {
             if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
-                slotManager.processResourceRequirements(resourceRequirements);
-
-                return CompletableFuture.completedFuture(Acknowledge.get());
+                return getRecoveryFuture()
+                        .thenApply(
+                                acknowledge -> {
+                                    slotManager.processResourceRequirements(resourceRequirements);

Review Comment:
   ```suggestion
                                       validateRunsInMainThread();
                                       slotManager.processResourceRequirements(resourceRequirements);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -1222,6 +1225,14 @@ protected abstract void internalDeregisterApplication(
      */
     public abstract boolean stopWorker(WorkerType worker);
 
+    /**
+     * Get the recovery future of the resource manager.
+     *
+     * @return The recovery future of the resource manager, which indicated whether it is ready to
+     *     serve.
+     */
+    protected abstract CompletableFuture<Acknowledge> getRecoveryFuture();

Review Comment:
   I'd suggest to name this something like `getReadyToServeFuture()`, which directly describes its semantics.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -244,6 +254,20 @@ public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
                     "Worker {} recovered from previous attempt.",
                     resourceId.getStringWithMetadata());
         }
+        if (recoveredWorkers.size() > 0) {

Review Comment:
   ```suggestion
           if (recoveredWorkers.size() > 0 && !previousWorkerRecoverTimeout.isZero()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #20256:
URL: https://github.com/apache/flink/pull/20256#discussion_r928205394


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -418,6 +443,30 @@ private void tryResetWorkerCreationCoolDown() {
         }
     }
 
+    @Override
+    public CompletableFuture<Acknowledge> getRecoveryFuture() {
+        return recoveryFuture;
+    }
+
+    private void tryRemovePreviousPendingRecoveryTaskManager(ResourceID resourceID, String reason) {
+        long sizeBeforeRemove = previousAttemptUnregisteredWorkers.size();
+        boolean exist = previousAttemptUnregisteredWorkers.remove(resourceID);
+        if (exist) {
+            if (!recoveryFuture.isDone()) {

Review Comment:
   Improve the condition check here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20256:
URL: https://github.com/apache/flink/pull/20256#issuecomment-1182024533

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0884bc6a479843df3f269850c688bce0dd835210",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0884bc6a479843df3f269850c688bce0dd835210",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0884bc6a479843df3f269850c688bce0dd835210 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #20256:
URL: https://github.com/apache/flink/pull/20256#discussion_r922991336


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -244,6 +253,20 @@ public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
                     "Worker {} recovered from previous attempt.",
                     resourceId.getStringWithMetadata());
         }
+        if (recoveredWorkers.size() > 0) {
+            scheduleRunAsync(
+                    () -> {
+                        if (!readyToServeFuture.isDone()) {
+                            readyToServeFuture.complete(null);
+                            log.info(
+                                    "Timeout to wait recovery taskmanagers, recovery future is completed");
+                        }
+                    },
+                    previousWorkerRecoverTimeout.getSeconds(),
+                    TimeUnit.SECONDS);

Review Comment:
   ```suggestion
                       previousWorkerRecoverTimeout.toMillis(),
                       TimeUnit.MILLISECONDS);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #20256:
URL: https://github.com/apache/flink/pull/20256#discussion_r928205414


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -244,6 +254,20 @@ public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
                     "Worker {} recovered from previous attempt.",
                     resourceId.getStringWithMetadata());
         }
+        if (recoveredWorkers.size() > 0) {
+            scheduleRunAsync(
+                    () -> {
+                        if (!recoveryFuture.isDone()) {

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #20256:
URL: https://github.com/apache/flink/pull/20256#issuecomment-1193259472

   Addressed your comments, please take a look again @xintongsong , thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong closed pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

Posted by GitBox <gi...@apache.org>.
xintongsong closed pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving
URL: https://github.com/apache/flink/pull/20256


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org