You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2023/01/24 12:40:42 UTC

[flink] branch master updated: [FLINK-30759][runtime] Merges DispatcherRunnerLeaderElectionLifecycleManager into DefaultDispatcherRunner

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d1d61c68d8 [FLINK-30759][runtime] Merges DispatcherRunnerLeaderElectionLifecycleManager into DefaultDispatcherRunner
2d1d61c68d8 is described below

commit 2d1d61c68d81eba0f721b10eaf2a7246aa0814c2
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Thu Jan 19 18:06:42 2023 +0100

    [FLINK-30759][runtime] Merges DispatcherRunnerLeaderElectionLifecycleManager into DefaultDispatcherRunner
    
    The entire purpose of this wrapper
    DispatcherRunnerLeaderElectionLifecycleManager
    is to have the start/stop of the LeaderElectionService
    out of the DefaultDispatcherRunner. FLINK-26522/FLIP-285
    will move this logic into the HighAvailabilityServices.
    Merging both classes makes the move easier because it
    aligns it with the other LeaderContender implementations.
    
    Signed-off-by: Matthias Pohl <ma...@aiven.io>
---
 .../dispatcher/runner/DefaultDispatcherRunner.java | 23 ++++++-
 ...atcherRunnerLeaderElectionLifecycleManager.java | 73 ----------------------
 .../runner/DefaultDispatcherRunnerTest.java        | 10 +++
 3 files changed, 30 insertions(+), 76 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
index 2d9c2207412..904750d290a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
@@ -74,6 +74,10 @@ public final class DefaultDispatcherRunner implements DispatcherRunner, LeaderCo
                 CompletableFuture.completedFuture(null);
     }
 
+    void start() throws Exception {
+        leaderElectionService.start(this);
+    }
+
     @Override
     public CompletableFuture<ApplicationStatus> getShutDownFuture() {
         return shutDownFuture;
@@ -89,11 +93,14 @@ public final class DefaultDispatcherRunner implements DispatcherRunner, LeaderCo
             }
         }
 
+        final CompletableFuture<Void> stopLeaderElectionServiceFuture = stopLeaderElectionService();
+
         stopDispatcherLeaderProcess();
 
         FutureUtils.forward(previousDispatcherLeaderProcessTerminationFuture, terminationFuture);
 
-        return terminationFuture;
+        return FutureUtils.completeAll(
+                Arrays.asList(terminationFuture, stopLeaderElectionServiceFuture));
     }
 
     // ---------------------------------------------------------------
@@ -191,6 +198,16 @@ public final class DefaultDispatcherRunner implements DispatcherRunner, LeaderCo
                 });
     }
 
+    private CompletableFuture<Void> stopLeaderElectionService() {
+        try {
+            leaderElectionService.stop();
+        } catch (Exception e) {
+            return FutureUtils.completedExceptionally(e);
+        }
+
+        return FutureUtils.completedVoidFuture();
+    }
+
     private void runActionIfRunning(Runnable runnable) {
         synchronized (lock) {
             if (running) {
@@ -221,7 +238,7 @@ public final class DefaultDispatcherRunner implements DispatcherRunner, LeaderCo
         final DefaultDispatcherRunner dispatcherRunner =
                 new DefaultDispatcherRunner(
                         leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
-        return DispatcherRunnerLeaderElectionLifecycleManager.createFor(
-                dispatcherRunner, leaderElectionService);
+        dispatcherRunner.start();
+        return dispatcherRunner;
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerLeaderElectionLifecycleManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerLeaderElectionLifecycleManager.java
deleted file mode 100644
index 3b053c97b5e..00000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerLeaderElectionLifecycleManager.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.dispatcher.runner;
-
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.leaderelection.LeaderContender;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.util.concurrent.FutureUtils;
-
-import java.util.Arrays;
-import java.util.concurrent.CompletableFuture;
-
-final class DispatcherRunnerLeaderElectionLifecycleManager<
-                T extends DispatcherRunner & LeaderContender>
-        implements DispatcherRunner {
-    private final T dispatcherRunner;
-    private final LeaderElectionService leaderElectionService;
-
-    private DispatcherRunnerLeaderElectionLifecycleManager(
-            T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
-        this.dispatcherRunner = dispatcherRunner;
-        this.leaderElectionService = leaderElectionService;
-
-        leaderElectionService.start(dispatcherRunner);
-    }
-
-    @Override
-    public CompletableFuture<ApplicationStatus> getShutDownFuture() {
-        return dispatcherRunner.getShutDownFuture();
-    }
-
-    @Override
-    public CompletableFuture<Void> closeAsync() {
-        final CompletableFuture<Void> servicesTerminationFuture = stopServices();
-        final CompletableFuture<Void> dispatcherRunnerTerminationFuture =
-                dispatcherRunner.closeAsync();
-
-        return FutureUtils.completeAll(
-                Arrays.asList(servicesTerminationFuture, dispatcherRunnerTerminationFuture));
-    }
-
-    private CompletableFuture<Void> stopServices() {
-        try {
-            leaderElectionService.stop();
-        } catch (Exception e) {
-            return FutureUtils.completedExceptionally(e);
-        }
-
-        return FutureUtils.completedVoidFuture();
-    }
-
-    public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor(
-            T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
-        return new DispatcherRunnerLeaderElectionLifecycleManager<>(
-                dispatcherRunner, leaderElectionService);
-    }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java
index 8a67ce06382..c43d93754e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeoutException;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /** Tests for the {@link DefaultDispatcherRunner}. */
@@ -68,6 +69,15 @@ public class DefaultDispatcherRunnerTest extends TestLogger {
         }
     }
 
+    @Test
+    public void testLeaderElectionLifecycle() throws Exception {
+        assertTrue(testingLeaderElectionService.isStopped());
+        try (final DispatcherRunner unusedDisptacherRunner = createDispatcherRunner()) {
+            assertFalse(testingLeaderElectionService.isStopped());
+        }
+        assertTrue(testingLeaderElectionService.isStopped());
+    }
+
     @Test
     public void closeAsync_doesNotCompleteUncompletedShutDownFuture() throws Exception {
         final DispatcherRunner dispatcherRunner = createDispatcherRunner();