You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/15 02:09:59 UTC

[GitHub] [flink] xintongsong commented on a diff in pull request #20256: [FLINK-24713][Runtime/Coordination] Postpone resourceManager serving

xintongsong commented on code in PR #20256:
URL: https://github.com/apache/flink/pull/20256#discussion_r920959736


##########
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java:
##########
@@ -261,6 +261,14 @@ public class ResourceManagerOptions {
                                     + TaskManagerOptions.REGISTRATION_TIMEOUT.key()
                                     + "'.");
 
+    /** Timeout for ResourceManager to recover all the previous attempts workers. */
+    public static final ConfigOption<Duration> RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT =
+            ConfigOptions.key("resourcemanager.previous-worker.recovery.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(5))
+                    .withDescription(
+                            "Timeout for resource manager to recover all the previous attempts workers.");

Review Comment:
   I think the description can be improved to make it more helpful for common users. E.g., what happens when the timeout is reached, how should the users tune this config.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -102,6 +103,12 @@
      */
     private CompletableFuture<Void> startWorkerCoolDown;
 
+    /** The future indicates whether the rm is ready to serve. */
+    private final CompletableFuture<Acknowledge> recoveryFuture;

Review Comment:
   ```suggestion
       private final CompletableFuture<Void> recoveryFuture;
   ```



##########
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java:
##########
@@ -261,6 +261,14 @@ public class ResourceManagerOptions {
                                     + TaskManagerOptions.REGISTRATION_TIMEOUT.key()
                                     + "'.");
 
+    /** Timeout for ResourceManager to recover all the previous attempts workers. */
+    public static final ConfigOption<Duration> RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT =
+            ConfigOptions.key("resourcemanager.previous-worker.recovery.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(5))

Review Comment:
   I think the default value should be `0`, to align with the previous behaviors.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -326,6 +350,7 @@ private void scheduleWorkerRegistrationTimeoutCheck(final ResourceID resourceId)
                                 "Worker {} did not register in {}, will stop it and request a new one if needed.",
                                 resourceId,
                                 workerRegistrationTimeout);
+                        tryRemovePreviousPendingRecoveryTaskManager(resourceId, "Register timeout");

Review Comment:
   We should not need this, because it is covered by `internalStopWorker`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -244,6 +254,20 @@ public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
                     "Worker {} recovered from previous attempt.",
                     resourceId.getStringWithMetadata());
         }
+        if (recoveredWorkers.size() > 0) {
+            scheduleRunAsync(
+                    () -> {
+                        if (!recoveryFuture.isDone()) {

Review Comment:
   No need to check for `recoveryFuture.isDone()`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -418,6 +443,30 @@ private void tryResetWorkerCreationCoolDown() {
         }
     }
 
+    @Override
+    public CompletableFuture<Acknowledge> getRecoveryFuture() {
+        return recoveryFuture;
+    }
+
+    private void tryRemovePreviousPendingRecoveryTaskManager(ResourceID resourceID, String reason) {
+        long sizeBeforeRemove = previousAttemptUnregisteredWorkers.size();
+        boolean exist = previousAttemptUnregisteredWorkers.remove(resourceID);
+        if (exist) {
+            if (!recoveryFuture.isDone()) {

Review Comment:
   No need to check for `exist` and `recoveryFuture.isDone()`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -531,9 +531,12 @@ public CompletableFuture<Acknowledge> declareRequiredResources(
 
         if (null != jobManagerRegistration) {
             if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
-                slotManager.processResourceRequirements(resourceRequirements);
-
-                return CompletableFuture.completedFuture(Acknowledge.get());
+                return getRecoveryFuture()
+                        .thenApply(
+                                acknowledge -> {
+                                    slotManager.processResourceRequirements(resourceRequirements);

Review Comment:
   ```suggestion
                                       validateRunsInMainThread();
                                       slotManager.processResourceRequirements(resourceRequirements);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -1222,6 +1225,14 @@ protected abstract void internalDeregisterApplication(
      */
     public abstract boolean stopWorker(WorkerType worker);
 
+    /**
+     * Get the recovery future of the resource manager.
+     *
+     * @return The recovery future of the resource manager, which indicated whether it is ready to
+     *     serve.
+     */
+    protected abstract CompletableFuture<Acknowledge> getRecoveryFuture();

Review Comment:
   I'd suggest to name this something like `getReadyToServeFuture()`, which directly describes its semantics.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -244,6 +254,20 @@ public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
                     "Worker {} recovered from previous attempt.",
                     resourceId.getStringWithMetadata());
         }
+        if (recoveredWorkers.size() > 0) {

Review Comment:
   ```suggestion
           if (recoveredWorkers.size() > 0 && !previousWorkerRecoverTimeout.isZero()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org