You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2023/01/24 12:40:42 UTC
[flink] branch master updated: [FLINK-30759][runtime] Merges DispatcherRunnerLeaderElectionLifecycleManager into DefaultDispatcherRunner
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2d1d61c68d8 [FLINK-30759][runtime] Merges DispatcherRunnerLeaderElectionLifecycleManager into DefaultDispatcherRunner
2d1d61c68d8 is described below
commit 2d1d61c68d81eba0f721b10eaf2a7246aa0814c2
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Thu Jan 19 18:06:42 2023 +0100
[FLINK-30759][runtime] Merges DispatcherRunnerLeaderElectionLifecycleManager into DefaultDispatcherRunner
The entire purpose of this wrapper
DispatcherRunnerLeaderElectionLifecycleManager
is to have the start/stop of the LeaderElectionService
out of the DefaultDispatcherRunner. FLINK-26522/FLIP-285
will move this logic into the HighAvailabilityServices.
Merging both classes makes the move easier because it
aligns it with the other LeaderContender implementations.
Signed-off-by: Matthias Pohl <ma...@aiven.io>
---
.../dispatcher/runner/DefaultDispatcherRunner.java | 23 ++++++-
...atcherRunnerLeaderElectionLifecycleManager.java | 73 ----------------------
.../runner/DefaultDispatcherRunnerTest.java | 10 +++
3 files changed, 30 insertions(+), 76 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
index 2d9c2207412..904750d290a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
@@ -74,6 +74,10 @@ public final class DefaultDispatcherRunner implements DispatcherRunner, LeaderCo
CompletableFuture.completedFuture(null);
}
+ void start() throws Exception {
+ leaderElectionService.start(this);
+ }
+
@Override
public CompletableFuture<ApplicationStatus> getShutDownFuture() {
return shutDownFuture;
@@ -89,11 +93,14 @@ public final class DefaultDispatcherRunner implements DispatcherRunner, LeaderCo
}
}
+ final CompletableFuture<Void> stopLeaderElectionServiceFuture = stopLeaderElectionService();
+
stopDispatcherLeaderProcess();
FutureUtils.forward(previousDispatcherLeaderProcessTerminationFuture, terminationFuture);
- return terminationFuture;
+ return FutureUtils.completeAll(
+ Arrays.asList(terminationFuture, stopLeaderElectionServiceFuture));
}
// ---------------------------------------------------------------
@@ -191,6 +198,16 @@ public final class DefaultDispatcherRunner implements DispatcherRunner, LeaderCo
});
}
+ private CompletableFuture<Void> stopLeaderElectionService() {
+ try {
+ leaderElectionService.stop();
+ } catch (Exception e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+
+ return FutureUtils.completedVoidFuture();
+ }
+
private void runActionIfRunning(Runnable runnable) {
synchronized (lock) {
if (running) {
@@ -221,7 +238,7 @@ public final class DefaultDispatcherRunner implements DispatcherRunner, LeaderCo
final DefaultDispatcherRunner dispatcherRunner =
new DefaultDispatcherRunner(
leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
- return DispatcherRunnerLeaderElectionLifecycleManager.createFor(
- dispatcherRunner, leaderElectionService);
+ dispatcherRunner.start();
+ return dispatcherRunner;
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerLeaderElectionLifecycleManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerLeaderElectionLifecycleManager.java
deleted file mode 100644
index 3b053c97b5e..00000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerLeaderElectionLifecycleManager.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.dispatcher.runner;
-
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.leaderelection.LeaderContender;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.util.concurrent.FutureUtils;
-
-import java.util.Arrays;
-import java.util.concurrent.CompletableFuture;
-
-final class DispatcherRunnerLeaderElectionLifecycleManager<
- T extends DispatcherRunner & LeaderContender>
- implements DispatcherRunner {
- private final T dispatcherRunner;
- private final LeaderElectionService leaderElectionService;
-
- private DispatcherRunnerLeaderElectionLifecycleManager(
- T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
- this.dispatcherRunner = dispatcherRunner;
- this.leaderElectionService = leaderElectionService;
-
- leaderElectionService.start(dispatcherRunner);
- }
-
- @Override
- public CompletableFuture<ApplicationStatus> getShutDownFuture() {
- return dispatcherRunner.getShutDownFuture();
- }
-
- @Override
- public CompletableFuture<Void> closeAsync() {
- final CompletableFuture<Void> servicesTerminationFuture = stopServices();
- final CompletableFuture<Void> dispatcherRunnerTerminationFuture =
- dispatcherRunner.closeAsync();
-
- return FutureUtils.completeAll(
- Arrays.asList(servicesTerminationFuture, dispatcherRunnerTerminationFuture));
- }
-
- private CompletableFuture<Void> stopServices() {
- try {
- leaderElectionService.stop();
- } catch (Exception e) {
- return FutureUtils.completedExceptionally(e);
- }
-
- return FutureUtils.completedVoidFuture();
- }
-
- public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor(
- T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
- return new DispatcherRunnerLeaderElectionLifecycleManager<>(
- dispatcherRunner, leaderElectionService);
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java
index 8a67ce06382..c43d93754e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeoutException;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Tests for the {@link DefaultDispatcherRunner}. */
@@ -68,6 +69,15 @@ public class DefaultDispatcherRunnerTest extends TestLogger {
}
}
+ @Test
+ public void testLeaderElectionLifecycle() throws Exception {
+ assertTrue(testingLeaderElectionService.isStopped());
+ try (final DispatcherRunner unusedDisptacherRunner = createDispatcherRunner()) {
+ assertFalse(testingLeaderElectionService.isStopped());
+ }
+ assertTrue(testingLeaderElectionService.isStopped());
+ }
+
@Test
public void closeAsync_doesNotCompleteUncompletedShutDownFuture() throws Exception {
final DispatcherRunner dispatcherRunner = createDispatcherRunner();