You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/02/21 03:37:35 UTC
[flink] 01/03: [FLINK-25893][runtime] Fix that ResourceManagerServiceImpl may call ResourceManager#deregisterApplication before RM being fully started.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d0c8547819d726ed8879132d82daf2129b086396
Author: Xintong Song <to...@gmail.com>
AuthorDate: Sun Feb 20 17:32:25 2022 +0800
[FLINK-25893][runtime] Fix that ResourceManagerServiceImpl may call ResourceManager#deregisterApplication before RM being fully started.
---
.../ResourceManagerServiceImpl.java | 36 ++++++++++++++++------
.../ResourceManagerServiceImplTest.java | 34 ++++++++++++++++++++
2 files changed, 61 insertions(+), 9 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
index e0add8e..692993a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
@@ -128,20 +128,38 @@ public class ResourceManagerServiceImpl implements ResourceManagerService, Leade
@Override
public CompletableFuture<Void> deregisterApplication(
final ApplicationStatus applicationStatus, final @Nullable String diagnostics) {
+
synchronized (lock) {
- if (running && leaderResourceManager != null) {
- return leaderResourceManager
- .getSelfGateway(ResourceManagerGateway.class)
- .deregisterApplication(applicationStatus, diagnostics)
- .thenApply(ack -> null);
- } else {
- return FutureUtils.completedExceptionally(
- new FlinkException(
- "Cannot deregister application. Resource manager service is not available."));
+ if (!running || leaderResourceManager == null) {
+ return deregisterWithoutLeaderRm();
}
+
+ final ResourceManager<?> currentLeaderRM = leaderResourceManager;
+ return currentLeaderRM
+ .getStartedFuture()
+ .thenCompose(
+ ignore -> {
+ synchronized (lock) {
+ if (isLeader(currentLeaderRM)) {
+ return currentLeaderRM
+ .getSelfGateway(ResourceManagerGateway.class)
+ .deregisterApplication(
+ applicationStatus, diagnostics)
+ .thenApply(ack -> null);
+ } else {
+ return deregisterWithoutLeaderRm();
+ }
+ }
+ });
}
}
+ private static CompletableFuture<Void> deregisterWithoutLeaderRm() {
+ return FutureUtils.completedExceptionally(
+ new FlinkException(
+ "Cannot deregister application. Resource manager service is not available."));
+ }
+
@Override
public CompletableFuture<Void> closeAsync() {
synchronized (lock) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
index 3242fc7..e365faa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -448,6 +449,39 @@ public class ResourceManagerServiceImplTest extends TestLogger {
closeServiceFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
}
+ @Test
+ public void deregisterApplication_leaderRmNotStarted() throws Exception {
+ final CompletableFuture<Void> startRmInitializationFuture = new CompletableFuture<>();
+ final CompletableFuture<Void> finishRmInitializationFuture = new CompletableFuture<>();
+
+ rmFactoryBuilder.setInitializeConsumer(
+ (ignore) -> {
+ startRmInitializationFuture.complete(null);
+ blockOnFuture(finishRmInitializationFuture);
+ });
+
+ createAndStartResourceManager();
+
+ // grant leadership
+ leaderElectionService.isLeader(UUID.randomUUID());
+
+ // make sure leader RM is created
+ startRmInitializationFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
+
+ // deregister application
+ final CompletableFuture<Void> deregisterApplicationFuture =
+ resourceManagerService.deregisterApplication(ApplicationStatus.CANCELED, null);
+
+ // RM not fully started, future should not complete
+ assertNotComplete(deregisterApplicationFuture);
+
+ // finish starting RM
+ finishRmInitializationFuture.complete(null);
+
+ // should perform deregistration
+ deregisterApplicationFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
+ }
+
private static void blockOnFuture(CompletableFuture<?> future) {
try {
future.get();