You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/02/21 03:37:35 UTC

[flink] 01/03: [FLINK-25893][runtime] Fix that ResourceManagerServiceImpl may call ResourceManager#deregisterApplication before RM being fully started.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d0c8547819d726ed8879132d82daf2129b086396
Author: Xintong Song <to...@gmail.com>
AuthorDate: Sun Feb 20 17:32:25 2022 +0800

    [FLINK-25893][runtime] Fix that ResourceManagerServiceImpl may call ResourceManager#deregisterApplication before RM being fully started.
---
 .../ResourceManagerServiceImpl.java                | 36 ++++++++++++++++------
 .../ResourceManagerServiceImplTest.java            | 34 ++++++++++++++++++++
 2 files changed, 61 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
index e0add8e..692993a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
@@ -128,20 +128,38 @@ public class ResourceManagerServiceImpl implements ResourceManagerService, Leade
     @Override
     public CompletableFuture<Void> deregisterApplication(
             final ApplicationStatus applicationStatus, final @Nullable String diagnostics) {
+
         synchronized (lock) {
-            if (running && leaderResourceManager != null) {
-                return leaderResourceManager
-                        .getSelfGateway(ResourceManagerGateway.class)
-                        .deregisterApplication(applicationStatus, diagnostics)
-                        .thenApply(ack -> null);
-            } else {
-                return FutureUtils.completedExceptionally(
-                        new FlinkException(
-                                "Cannot deregister application. Resource manager service is not available."));
+            if (!running || leaderResourceManager == null) {
+                return deregisterWithoutLeaderRm();
             }
+
+            final ResourceManager<?> currentLeaderRM = leaderResourceManager;
+            return currentLeaderRM
+                    .getStartedFuture()
+                    .thenCompose(
+                            ignore -> {
+                                synchronized (lock) {
+                                    if (isLeader(currentLeaderRM)) {
+                                        return currentLeaderRM
+                                                .getSelfGateway(ResourceManagerGateway.class)
+                                                .deregisterApplication(
+                                                        applicationStatus, diagnostics)
+                                                .thenApply(ack -> null);
+                                    } else {
+                                        return deregisterWithoutLeaderRm();
+                                    }
+                                }
+                            });
         }
     }
 
+    private static CompletableFuture<Void> deregisterWithoutLeaderRm() {
+        return FutureUtils.completedExceptionally(
+                new FlinkException(
+                        "Cannot deregister application. Resource manager service is not available."));
+    }
+
     @Override
     public CompletableFuture<Void> closeAsync() {
         synchronized (lock) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
index 3242fc7..e365faa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -448,6 +449,39 @@ public class ResourceManagerServiceImplTest extends TestLogger {
         closeServiceFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
     }
 
+    @Test
+    public void deregisterApplication_leaderRmNotStarted() throws Exception {
+        final CompletableFuture<Void> startRmInitializationFuture = new CompletableFuture<>();
+        final CompletableFuture<Void> finishRmInitializationFuture = new CompletableFuture<>();
+
+        rmFactoryBuilder.setInitializeConsumer(
+                (ignore) -> {
+                    startRmInitializationFuture.complete(null);
+                    blockOnFuture(finishRmInitializationFuture);
+                });
+
+        createAndStartResourceManager();
+
+        // grant leadership
+        leaderElectionService.isLeader(UUID.randomUUID());
+
+        // make sure leader RM is created
+        startRmInitializationFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
+
+        // deregister application
+        final CompletableFuture<Void> deregisterApplicationFuture =
+                resourceManagerService.deregisterApplication(ApplicationStatus.CANCELED, null);
+
+        // RM not fully started, future should not complete
+        assertNotComplete(deregisterApplicationFuture);
+
+        // finish starting RM
+        finishRmInitializationFuture.complete(null);
+
+        // should perform deregistration
+        deregisterApplicationFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
+    }
+
     private static void blockOnFuture(CompletableFuture<?> future) {
         try {
             future.get();