You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "XComp (via GitHub)" <gi...@apache.org> on 2023/04/11 13:15:58 UTC

[GitHub] [flink] XComp opened a new pull request, #22380: [FLINK-31773][runtime] Introduces LeaderElectionService.LeaderElection interface

XComp opened a new pull request, #22380:
URL: https://github.com/apache/flink/pull/22380

   - [X] Introduces LeaderElectionService.LeaderElection interface (FLINK-31768)
   - [ ] Separate the LeaderElectionService.start() into two methods: `startLeaderElectionBackend` for initializing the HA backend and createLeaderElection(LeaderContender)` for registering the `LeaderContender`
   - [ ] Move registering the `LeaderContender` out of `LeaderElectionService.createLeaderElection` into `LeaderElection.register(LeaderContender)`
   - [ ] Introducing `contenderID` in `LeaderElectionService.createLeaderElection`
   
   ## What is the purpose of the change
   
   This change is about introducing the new interface `LeaderElectionService.LeaderElection`. It's part of a series of individual PRs to introduce the new interfaces that are described in [FLIP-285](https://cwiki.apache.org/confluence/display/FLINK/FLIP-285%3A+Refactoring+LeaderElection+to+make+Flink+support+multi-component+leader+election+out-of-the-box).
   
   ## Brief change log
   
   * Introduces `LeaderElectionService.LeaderElection`
   * Includes this interface in existing implementations
   
   ## Verifying this change
   
   Tests should succeed as usual. No functionality is changed.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [WIP][FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1163950315


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionUtils.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class LeaderElectionUtils {
+
+    public static String convertToString(
+            Map<String, LeaderInformation> leaderInformationPerContender) {
+        final Map<String, String> convertedMap =

Review Comment:
   I see your point. I'm gonna remove these methods for now. They are not used yet but will be used in a future PR. Maybe, I find a better approach then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22380: [FLINK-31773][runtime] Introduces LeaderElectionService.LeaderElection interface

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1163856038


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionUtils.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class LeaderElectionUtils {
+
+    public static String convertToString(
+            Map<String, LeaderInformation> leaderInformationPerContender) {
+        final Map<String, String> convertedMap =

Review Comment:
   Having to construct another map purely for value string conversions seems wrong.
   ```
   map.entrySet().stream()
     .map(entry -> entry.getKey() + "=" + convertToString(entry.getValue())
     .collect(Collectors.joining(", "))
   ```
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -77,22 +97,44 @@ public DefaultLeaderElectionService(LeaderElectionDriverFactory leaderElectionDr
         this.leaderElectionDriver = null;
 
         this.confirmedLeaderInformation = LeaderInformation.empty();
+    }
 
-        this.running = false;
+    public void startLeaderElectionBackend() throws Exception {

Review Comment:
   What I missing that makes this usable? This isn't called anywhere in production?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionUtils.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class LeaderElectionUtils {

Review Comment:
   missing javadoc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169862792


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
  * Default implementation for leader election service. Composed with different {@link
  * LeaderElectionDriver}, we could perform a leader election for the contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link #start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link LeaderElectionDriver} iff the leadership

Review Comment:
   Now I get you initial statement :bulb: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169762851


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -69,6 +71,118 @@ void testOnGrantAndRevokeLeadership() throws Exception {
         };
     }
 
+    @Test
+    void testDelayedGrantCallAfterContenderRegistration() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            final CompletableFuture<Void> grantLeadershipFuture = new CompletableFuture<>();
+            driver.isLeader(grantLeadershipFuture);
+
+            final TestingContender contender = new TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            assertThat(testInstance.getLeaderSessionID())
+                    .as("Leadership grant was not forwarded to the contender, yet.")
+                    .isNull();
+
+            grantLeadershipFuture.complete(null);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * Test to cover the issue described in FLINK-31814. This test could be removed after
+     * FLINK-31814 is resolved.
+     */
+    @Test
+    void testOnRevokeCallWhileClosingService() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(
+                        LeaderElectionEventHandler::onRevokeLeadership);
+
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            driver.isLeader();
+
+            final TestingContender contender = new TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * This issue can happen when the shutdown of the contender takes too long and the leadership is
+     * re-acquired in the meantime (see FLINK-29234).
+     */
+    @Test
+    void testStopWhileHavingLeadership() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();

Review Comment:
   Ah, now I remember why I added this method in the first place. With the `MultipleComponentLeaderElection*` classes we added this circular dependency between the `DefaultLeaderElectionService` and the `DefaultMultipleComponentLeaderElectionService` which calls the `DefaultLeaderElectionService.onGrantLeadership` while registering the service in [DefaultMultipleComponentLeaederElectionService:152](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L152). This call will result in accessing the `DefaultLeaderElectionService` instance which is still in instantiation phase.
   
   I created FLINK-31837 to cover this issue in a follow-up task.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169800445


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
  * Default implementation for leader election service. Composed with different {@link
  * LeaderElectionDriver}, we could perform a leader election for the contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link #start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link LeaderElectionDriver} iff the leadership
+     * is acquired by this service. {@code issuedLeaderSessionID} being {@code null} indicates that
+     * this service isn't the leader right now (i.e. {@code
+     * leaderElectionDriver.hasLeadership(UUID)} would return {@code false} for any session ID.
+     */
     @GuardedBy("lock")
     @Nullable
-    private volatile UUID issuedLeaderSessionID;
+    private UUID issuedLeaderSessionID;
 
+    /**
+     * Saves the leader information for a registered {@link LeaderContender} after this contender
+     * confirmed the leadership.
+     */
     @GuardedBy("lock")
-    private volatile LeaderInformation confirmedLeaderInformation;
+    private LeaderInformation confirmedLeaderInformation;
 
+    /**
+     * {@code leaderElectionDriver} being {@code null} indicates that the connection to the
+     * LeaderElection backend isn't established, yet. See {@link #startLeaderElectionBackend()} and
+     * {@link #close()} for lifecycle management. The lifecycle of the driver should have been
+     * established before registering a {@link LeaderContender} and stopped after the contender has
+     * been removed.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    private volatile boolean running;
-
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private LeaderElectionDriver leaderElectionDriver;
+    private volatile LeaderElectionDriver leaderElectionDriver;

Review Comment:
   Because I somehow missed it when I cleaned up the volatile keywords of all the fields. :+1: :facepalm: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169789210


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
  * Default implementation for leader election service. Composed with different {@link
  * LeaderElectionDriver}, we could perform a leader election for the contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link #start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link LeaderElectionDriver} iff the leadership

Review Comment:
   with "fix a type" you mean fix a typo? :thinking: The `iff` was actually intended (see [Wiktionary](https://en.wikipedia.org/wiki/If_and_only_if#:~:text=In%20logic%20and%20related%20fields,true%20or%20both%20are%20false.)). But it probably adds too much confusion: I'm gonna replace `iff`
   
   I also noticed that the comment itself isn't 100% correct in general. I fixed that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1543473296

   This PR is ready to get another pass. I tried to keep the commits in reasonable chunks hoping that it helps to review things. The commits will be squashed in the end.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1193863651


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -32,49 +31,73 @@
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Default implementation for leader election service. Composed with different {@link
  * LeaderElectionDriver}, we could perform a leader election for the contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link #start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link LeaderElectionDriver} if and only if the
+     * leadership is acquired by this service. {@code issuedLeaderSessionID} being {@code null}
+     * indicates that this service isn't the leader right now (i.e. {@link
+     * #onGrantLeadership(UUID)}) wasn't called, yet (independently of what {@code
+     * leaderElectionDriver#hasLeadership()} returns).
+     */
     @GuardedBy("lock")
     @Nullable
-    private volatile UUID issuedLeaderSessionID;
+    private UUID issuedLeaderSessionID;
 
+    /**
+     * Saves the leader information for a registered {@link LeaderContender} after this contender
+     * confirmed the leadership.
+     */
     @GuardedBy("lock")
-    private volatile LeaderInformation confirmedLeaderInformation;
+    private LeaderInformation confirmedLeaderInformation;
 
+    /**
+     * {@code leaderElectionDriver} being {@code null} indicates that the connection to the
+     * LeaderElection backend isn't established, yet. See {@link #startLeaderElectionBackend()} and
+     * {@link #close()} for lifecycle management. The lifecycle of the driver should have been
+     * established before registering a {@link LeaderContender} and stopped after the contender has
+     * been removed.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    private volatile boolean running;
+    private volatile LeaderElectionDriver leaderElectionDriver;
 
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private LeaderElectionDriver leaderElectionDriver;
-
-    private final ExecutorService leadershipOperationExecutor;
+    @Nullable private ExecutorService leadershipOperationExecutor;

Review Comment:
   args, good idea - I should have gone through the `ExecutorService` interface for such a check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1193866384


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -270,65 +390,73 @@ public void onLeaderInformationChange(LeaderInformation leaderInformation) {
         runInLeaderEventThread(() -> onLeaderInformationChangeInternal(leaderInformation));
     }
 
+    @GuardedBy("lock")
     private void onLeaderInformationChangeInternal(LeaderInformation leaderInformation) {
-        synchronized (lock) {
-            if (running) {
-                LOG.trace(
-                        "Leader node changed while {} is the leader with session ID {}. New leader information {}.",
-                        leaderContender.getDescription(),
-                        confirmedLeaderInformation.getLeaderSessionID(),
-                        leaderInformation);
-                if (!confirmedLeaderInformation.isEmpty()) {
-                    final LeaderInformation confirmedLeaderInfo = this.confirmedLeaderInformation;
-                    if (leaderInformation.isEmpty()) {
-                        LOG.debug(
-                                "Writing leader information by {} since the external storage is empty.",
-                                leaderContender.getDescription());
-                        leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
-                    } else if (!leaderInformation.equals(confirmedLeaderInfo)) {
-                        // the data field does not correspond to the expected leader information
-                        LOG.debug(
-                                "Correcting leader information by {}.",
-                                leaderContender.getDescription());
-                        leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
-                    }
+        if (leaderContender != null) {
+            LOG.trace(
+                    "Leader node changed while {} is the leader with {}. New leader information {}.",
+                    leaderContender.getDescription(),
+                    LeaderElectionUtils.convertToString(confirmedLeaderInformation),
+                    LeaderElectionUtils.convertToString(leaderInformation));
+            if (!confirmedLeaderInformation.isEmpty()) {
+                final LeaderInformation confirmedLeaderInfo = this.confirmedLeaderInformation;
+                if (leaderInformation.isEmpty()) {
+                    LOG.debug(
+                            "Writing leader information by {} since the external storage is empty.",
+                            leaderContender.getDescription());
+                    leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
+                } else if (!leaderInformation.equals(confirmedLeaderInfo)) {
+                    // the data field does not correspond to the expected leader information
+                    LOG.debug(
+                            "Correcting leader information by {}.",
+                            leaderContender.getDescription());
+                    leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
                 }
-            } else {
-                LOG.debug(
-                        "Ignoring change notification since the {} has already been closed.",
-                        leaderElectionDriver);
             }
+        } else {
+            LOG.debug(
+                    "Ignoring change notification since the {} has already been stopped.",
+                    leaderElectionDriver);
         }
     }
 
     private void runInLeaderEventThread(Runnable callback) {
-        if (running) {
+        if (leadershipOperationExecutor != null) {

Review Comment:
   No it's not: I missed that one (even with passing over the PR last week -.-). I added a lock around the execute call and added the @GuardedBy annotation to the `leadershipOperationExecutor`. Additional, I removed the `volatile` from `leaderElectionDriver`. That seem to have survived previous changes and is not necessary anymore. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Introduces LeaderElectionService.LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1505026405

   Sorry for not mentioning it before. This PR is still in draft mode because I ran into test failures (I should have updated the PR's description and title). My initial plan with introducing the `LeaderElection` interface first didn't work. It would have required more changes blowing the PR up and making the overall change hard to review. I try a different approach now where I cut the LeaderElectionService methods into pieces first before introducing the `LeaderElection` interface. I'm going to update the PR's description and title accordingly now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22380: [FLINK-31773][runtime] Introduces LeaderElectionService.LeaderElection interface

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1503340552

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6ee9591a34fb05b2ab3c64d333a3014778a7b59b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6ee9591a34fb05b2ab3c64d333a3014778a7b59b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6ee9591a34fb05b2ab3c64d333a3014778a7b59b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169762851


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -69,6 +71,118 @@ void testOnGrantAndRevokeLeadership() throws Exception {
         };
     }
 
+    @Test
+    void testDelayedGrantCallAfterContenderRegistration() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            final CompletableFuture<Void> grantLeadershipFuture = new CompletableFuture<>();
+            driver.isLeader(grantLeadershipFuture);
+
+            final TestingContender contender = new TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            assertThat(testInstance.getLeaderSessionID())
+                    .as("Leadership grant was not forwarded to the contender, yet.")
+                    .isNull();
+
+            grantLeadershipFuture.complete(null);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * Test to cover the issue described in FLINK-31814. This test could be removed after
+     * FLINK-31814 is resolved.
+     */
+    @Test
+    void testOnRevokeCallWhileClosingService() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(
+                        LeaderElectionEventHandler::onRevokeLeadership);
+
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            driver.isLeader();
+
+            final TestingContender contender = new TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * This issue can happen when the shutdown of the contender takes too long and the leadership is
+     * re-acquired in the meantime (see FLINK-29234).
+     */
+    @Test
+    void testStopWhileHavingLeadership() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();

Review Comment:
   Ah, now I remember why I added this method in the first place. With the `MultipleComponentLeaderElection*` classes we added this circular dependency between the `DefaultLeaderElectionService` and the `DefaultMultipleComponentLeaderElectionService` which calls the `DefaultLeaderElectionService.onGrantLeadership` while registering the service in [DefaultMultipleComponentLeaederElectionService:152](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L152). This call will result in accessing the `DefaultLeaderElectionService` instance which is still in instantiation phase.
   
   I created FLINK-31837 to cover this issue in a follow-up task and extended the method's javadoc acocrdingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169874448


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -70,22 +77,37 @@ public LeaderInformation getLeaderInformation() {
     }
 
     public void isLeader() {
+        isLeader(FutureUtils.completedVoidFuture());
+    }
+
+    public void isLeader(CompletableFuture<Void> grantLeadershipFuture) {
         synchronized (lock) {
             isLeader.set(true);
-            leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
+            grantLeadershipFuture.thenRun(
+                    () -> leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID()));

Review Comment:
   That's related to your [other comment](https://github.com/apache/flink/pull/22380#discussion_r1168771139). I'm inclined to halt this PR and create a separate ticket that moves the thread handling into the `DefaultLeaderElectionService`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [WIP][FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1163950315


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionUtils.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class LeaderElectionUtils {
+
+    public static String convertToString(
+            Map<String, LeaderInformation> leaderInformationPerContender) {
+        final Map<String, String> convertedMap =

Review Comment:
   I'm gonna remove these methods for now. They are not used yet but will be used in a future PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1168724716


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -70,22 +77,37 @@ public LeaderInformation getLeaderInformation() {
     }
 
     public void isLeader() {
+        isLeader(FutureUtils.completedVoidFuture());
+    }
+
+    public void isLeader(CompletableFuture<Void> grantLeadershipFuture) {
         synchronized (lock) {
             isLeader.set(true);
-            leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
+            grantLeadershipFuture.thenRun(
+                    () -> leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID()));

Review Comment:
   Why is it not a problem that onGrantLeadership is no longer called under the lock? Could this now result in concurrent onGrant/onRevoke calls being made?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -69,6 +71,118 @@ void testOnGrantAndRevokeLeadership() throws Exception {
         };
     }
 
+    @Test
+    void testDelayedGrantCallAfterContenderRegistration() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            final CompletableFuture<Void> grantLeadershipFuture = new CompletableFuture<>();
+            driver.isLeader(grantLeadershipFuture);
+
+            final TestingContender contender = new TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            assertThat(testInstance.getLeaderSessionID())
+                    .as("Leadership grant was not forwarded to the contender, yet.")
+                    .isNull();
+
+            grantLeadershipFuture.complete(null);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * Test to cover the issue described in FLINK-31814. This test could be removed after
+     * FLINK-31814 is resolved.
+     */
+    @Test
+    void testOnRevokeCallWhileClosingService() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(
+                        LeaderElectionEventHandler::onRevokeLeadership);
+
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            driver.isLeader();
+
+            final TestingContender contender = new TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * This issue can happen when the shutdown of the contender takes too long and the leadership is
+     * re-acquired in the meantime (see FLINK-29234).

Review Comment:
   I dont understand how this comment relates to the test.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
  * Default implementation for leader election service. Composed with different {@link
  * LeaderElectionDriver}, we could perform a leader election for the contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link #start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link LeaderElectionDriver} iff the leadership

Review Comment:
   ```suggestion
        * Saves the session ID which was issued by the {@link LeaderElectionDriver} if the leadership
   ```
   Not looking forward to more PRs trying to "fix a type".



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -69,6 +71,118 @@ void testOnGrantAndRevokeLeadership() throws Exception {
         };
     }
 
+    @Test
+    void testDelayedGrantCallAfterContenderRegistration() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            final CompletableFuture<Void> grantLeadershipFuture = new CompletableFuture<>();
+            driver.isLeader(grantLeadershipFuture);
+
+            final TestingContender contender = new TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            assertThat(testInstance.getLeaderSessionID())
+                    .as("Leadership grant was not forwarded to the contender, yet.")
+                    .isNull();
+
+            grantLeadershipFuture.complete(null);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * Test to cover the issue described in FLINK-31814. This test could be removed after
+     * FLINK-31814 is resolved.
+     */
+    @Test
+    void testOnRevokeCallWhileClosingService() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(
+                        LeaderElectionEventHandler::onRevokeLeadership);
+
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            driver.isLeader();
+
+            final TestingContender contender = new TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * This issue can happen when the shutdown of the contender takes too long and the leadership is
+     * re-acquired in the meantime (see FLINK-29234).
+     */
+    @Test
+    void testStopWhileHavingLeadership() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();

Review Comment:
   Why isnt this just part of the constructor?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderContender.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import java.util.UUID;
+import java.util.function.Consumer;
+
+/**
+ * {@code TestingLeaderContender} is a more generic implementation in comparison to {@link
+ * TestingContender}.
+ */
+public class TestingLeaderContender implements LeaderContender {
+
+    private final Consumer<UUID> grantLeadershipConsumer;
+    private final Runnable revokeLeadershipRunnable;
+    private final Consumer<Exception> handleErrorConsumer;
+
+    public TestingLeaderContender(
+            Consumer<UUID> grantLeadershipConsumer,
+            Runnable revokeLeadershipRunnable,
+            Consumer<Exception> handleErrorConsumer) {
+        this.grantLeadershipConsumer = grantLeadershipConsumer;
+        this.revokeLeadershipRunnable = revokeLeadershipRunnable;
+        this.handleErrorConsumer = handleErrorConsumer;
+    }
+
+    @Override
+    public void grantLeadership(UUID leaderSessionID) {
+        grantLeadershipConsumer.accept(leaderSessionID);
+    }
+
+    @Override
+    public void revokeLeadership() {
+        revokeLeadershipRunnable.run();
+    }
+
+    @Override
+    public void handleError(Exception exception) {
+        handleErrorConsumer.accept(exception);
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /** {@code Builder} for {@code TestingLeaderContender}. */
+    public static class Builder {

Review Comment:
   add private constructor to enforce single instantiation path



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -69,6 +71,118 @@ void testOnGrantAndRevokeLeadership() throws Exception {
         };
     }
 
+    @Test
+    void testDelayedGrantCallAfterContenderRegistration() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            final CompletableFuture<Void> grantLeadershipFuture = new CompletableFuture<>();
+            driver.isLeader(grantLeadershipFuture);
+
+            final TestingContender contender = new TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            assertThat(testInstance.getLeaderSessionID())
+                    .as("Leadership grant was not forwarded to the contender, yet.")
+                    .isNull();
+
+            grantLeadershipFuture.complete(null);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * Test to cover the issue described in FLINK-31814. This test could be removed after
+     * FLINK-31814 is resolved.
+     */
+    @Test
+    void testOnRevokeCallWhileClosingService() throws Exception {

Review Comment:
   Is the description in FLINK-31814 based on your WIP branch? Asking since there is neither a `DefaultLeaderElection.close()` or `DefaultLeaderElectionService.close` in the current master...



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -77,22 +100,48 @@ public DefaultLeaderElectionService(LeaderElectionDriverFactory leaderElectionDr
         this.leaderElectionDriver = null;
 
         this.confirmedLeaderInformation = LeaderInformation.empty();
+    }
 
-        this.running = false;
+    /**
+     * Starts the leader election process. This method has to be called before registering a {@link
+     * LeaderContender}.
+     */
+    public void startLeaderElectionBackend() throws Exception {
+        synchronized (lock) {
+            Preconditions.checkState(
+                    leaderContender == null,
+                    "No LeaderContender should have been registered, yet.");
+
+            leaderElectionDriver =
+                    leaderElectionDriverFactory.createLeaderElectionDriver(
+                            this, new LeaderElectionFatalErrorHandler());
+
+            LOG.info("Instantiating DefaultLeaderElectionService with {}.", leaderElectionDriver);
+        }
     }
 
     @Override
     public final void start(LeaderContender contender) throws Exception {
         checkNotNull(contender, "Contender must not be null.");
-        Preconditions.checkState(leaderContender == null, "Contender was already set.");
 
         synchronized (lock) {
-            running = true;
+            Preconditions.checkState(
+                    leaderContender == null,
+                    "Only one LeaderContender is allowed to be registered to this service.");
+            Preconditions.checkState(
+                    leaderElectionDriver != null,
+                    "The DefaultLeaderElectionService should have established a connection to the backend before it's started.");
+
             leaderContender = contender;
-            leaderElectionDriver =
-                    leaderElectionDriverFactory.createLeaderElectionDriver(
-                            this, new LeaderElectionFatalErrorHandler());
-            LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);
+
+            LOG.info(
+                    "LeaderContender {} has been registered for {}.",
+                    contender.getDescription(),
+                    leaderElectionDriver);
+
+            if (hasLeadership()) {
+                notifyLeaderContenderOfLeadership();

Review Comment:
   Wondering if this should really happen in `start()` :thinking: That means it's a different thread than usually, right? Usually it'd be some HA backend thread, but now it's the main thread?
   
   Why isnt this calling `onGrantLeadership`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
  * Default implementation for leader election service. Composed with different {@link
  * LeaderElectionDriver}, we could perform a leader election for the contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link #start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link LeaderElectionDriver} iff the leadership
+     * is acquired by this service. {@code issuedLeaderSessionID} being {@code null} indicates that
+     * this service isn't the leader right now (i.e. {@code
+     * leaderElectionDriver.hasLeadership(UUID)} would return {@code false} for any session ID.
+     */
     @GuardedBy("lock")
     @Nullable
-    private volatile UUID issuedLeaderSessionID;
+    private UUID issuedLeaderSessionID;
 
+    /**
+     * Saves the leader information for a registered {@link LeaderContender} after this contender
+     * confirmed the leadership.
+     */
     @GuardedBy("lock")
-    private volatile LeaderInformation confirmedLeaderInformation;
+    private LeaderInformation confirmedLeaderInformation;
 
+    /**
+     * {@code leaderElectionDriver} being {@code null} indicates that the connection to the
+     * LeaderElection backend isn't established, yet. See {@link #startLeaderElectionBackend()} and
+     * {@link #close()} for lifecycle management. The lifecycle of the driver should have been
+     * established before registering a {@link LeaderContender} and stopped after the contender has
+     * been removed.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    private volatile boolean running;
-
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private LeaderElectionDriver leaderElectionDriver;
+    private volatile LeaderElectionDriver leaderElectionDriver;

Review Comment:
   why is this one now volatile when everything else isn't?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -145,96 +222,122 @@ public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
                 } else {
                     LOG.warn(
                             "The leader session ID {} was confirmed even though the "
-                                    + "corresponding JobManager was not elected as the leader.",
+                                    + "corresponding service was not elected as the leader or has been stopped already.",
                             leaderSessionID);
                 }
             }
         }
     }
 
+    @GuardedBy("lock")
+    private boolean hasLeadership() {
+        return leaderElectionDriver.hasLeadership() && issuedLeaderSessionID != null;
+    }
+
     @Override
     public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
         synchronized (lock) {
-            if (running) {
-                return leaderElectionDriver.hasLeadership()
-                        && leaderSessionId.equals(issuedLeaderSessionID);
+            if (leaderElectionDriver != null) {
+                if (leaderContender != null) {
+                    return hasLeadership() && leaderSessionId.equals(issuedLeaderSessionID);
+                } else {
+                    LOG.debug(
+                            "hasLeadership is called after the LeaderContender was removed, returning false.");
+                    return false;
+                }
             } else {
-                LOG.debug("hasLeadership is called after the service is stopped, returning false.");
+                LOG.debug("hasLeadership is called after the service is closed, returning false.");
                 return false;
             }
         }
     }
 
-    /**
-     * Returns the current leader session ID or null, if the contender is not the leader.
-     *
-     * @return The last leader session ID or null, if the contender is not the leader
-     */
+    /** Returns the current leader session ID or {@code null}, if the session wasn't confirmed. */
     @VisibleForTesting
     @Nullable
     public UUID getLeaderSessionID() {
-        return confirmedLeaderInformation.getLeaderSessionID();
-    }
-
-    @GuardedBy("lock")
-    private void confirmLeaderInformation(UUID leaderSessionID, String leaderAddress) {
-        confirmedLeaderInformation = LeaderInformation.known(leaderSessionID, leaderAddress);
-        leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
+        synchronized (lock) {
+            return confirmedLeaderInformation.getLeaderSessionID();
+        }
     }
 
     @Override
     public void onGrantLeadership(UUID newLeaderSessionId) {
+        Preconditions.checkNotNull(newLeaderSessionId);
+
         synchronized (lock) {
-            if (running) {
-                issuedLeaderSessionID = newLeaderSessionId;
-                confirmedLeaderInformation = LeaderInformation.empty();
+            Preconditions.checkState(
+                    issuedLeaderSessionID == null,
+                    "The leadership should have been granted while not having the leadership acquired.");
 
-                LOG.debug(
-                        "Grant leadership to contender {} with session ID {}.",
-                        leaderContender.getDescription(),
-                        issuedLeaderSessionID);
+            issuedLeaderSessionID = newLeaderSessionId;
 
-                leaderContender.grantLeadership(issuedLeaderSessionID);
+            if (leaderContender != null) {
+                notifyLeaderContenderOfLeadership();
             } else {
                 LOG.debug(
-                        "Ignoring the grant leadership notification since the {} has already been closed.",
+                        "The grant leadership notification is not forwarded because the DefaultLeaderElectionService ({}) has no contender registered.",
                         leaderElectionDriver);
             }
         }
     }
 
+    @GuardedBy("lock")
+    private void notifyLeaderContenderOfLeadership() {
+        Preconditions.checkState(
+                confirmedLeaderInformation.isEmpty(),
+                "The leadership should have been granted while not having the leadership acquired.");
+
+        LOG.debug(
+                "Granting leadership to contender {} with session ID {}.",
+                leaderContender.getDescription(),
+                issuedLeaderSessionID);
+
+        leaderContender.grantLeadership(issuedLeaderSessionID);
+    }
+
     @Override
     public void onRevokeLeadership() {
         synchronized (lock) {
-            if (running) {
-                handleLeadershipLoss();
+            // TODO: FLINK-31814 covers adding this Precondition
+            // Preconditions.checkState(issuedLeaderSessionID != null,"The leadership should have
+            // been revoked while having the leadership acquired.");
+
+            final UUID previousSessionID = issuedLeaderSessionID;
+            issuedLeaderSessionID = null;
+
+            if (leaderContender != null) {

Review Comment:
   Is this branching also related to FLINK-31814?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java:
##########
@@ -772,7 +773,11 @@ void testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip
             closeAsyncCalledTrigger.await();
 
             final CheckedThread grantLeadershipThread =
-                    createCheckedThread(currentLeaderDriver::isLeader);
+                    createCheckedThread(
+                            () -> {
+                                currentLeaderDriver.notLeader();
+                                currentLeaderDriver.isLeader();

Review Comment:
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169836315


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
  * Default implementation for leader election service. Composed with different {@link
  * LeaderElectionDriver}, we could perform a leader election for the contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link #start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link LeaderElectionDriver} iff the leadership

Review Comment:
   I meant typo.
   
   I know what `iff` means. Many people don't, and open PRs to change it to `if`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169844096


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java:
##########
@@ -302,6 +303,8 @@ void testZooKeeperReelectionWithReplacement() throws Exception {
 
                     // stop leader election service = revoke leadership
                     leaderElectionService[index].stop();
+                    leaderElectionService[index].close();

Review Comment:
   good catch :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1512910996

   Thanks for the comments, @zentol and @reswqa . I addressed them. @zentol raised some valid concerns about the thread-safety which I missed in this PR. I decided to create FLINK-31878 and cover the thread-safety issue separately before going ahead with this (and the subsequent) PR(s).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169638735


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -69,6 +71,118 @@ void testOnGrantAndRevokeLeadership() throws Exception {
         };
     }
 
+    @Test
+    void testDelayedGrantCallAfterContenderRegistration() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            final CompletableFuture<Void> grantLeadershipFuture = new CompletableFuture<>();
+            driver.isLeader(grantLeadershipFuture);
+
+            final TestingContender contender = new TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            assertThat(testInstance.getLeaderSessionID())
+                    .as("Leadership grant was not forwarded to the contender, yet.")
+                    .isNull();
+
+            grantLeadershipFuture.complete(null);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * Test to cover the issue described in FLINK-31814. This test could be removed after
+     * FLINK-31814 is resolved.
+     */
+    @Test
+    void testOnRevokeCallWhileClosingService() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(
+                        LeaderElectionEventHandler::onRevokeLeadership);
+
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            driver.isLeader();
+
+            final TestingContender contender = new TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * This issue can happen when the shutdown of the contender takes too long and the leadership is
+     * re-acquired in the meantime (see FLINK-29234).
+     */
+    @Test
+    void testStopWhileHavingLeadership() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();

Review Comment:
   Initially, I thought of putting it in public interface but realized that it's specific to the `DefaultLeaderElectionService` implementation and, therefore, removed it from the `LeaderElectionService` interface. It just didn't cross my mind to call the method in the constructor. But it's a valid point now that's it's out of the interface. I will change it. :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1538308882

   I rebased the branch to PR #22544 (which is the FLINK-32029 change).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1549379543

   https://github.com/apache/flink/commit/76bd8bd7d4b7e4a559cbd9fc5f430e9276821e53) is just a rebase to `master`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1549541350

   > @reswqa anything to add from your side?
   
   Thanks @XComp and @zentol, Looks good from my side 👍.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1193802489


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderBase.java:
##########
@@ -80,6 +84,20 @@ public Throwable getError() {
         return error == null ? errorQueue.poll() : error;
     }
 
+    /**
+     * Method for exposing errors that were caught during the test execution and need to be exposed
+     * within the test.
+     */
+    public void throwErrorIfPresent() throws Throwable {
+        if (error != null) {
+            throw error;
+        }
+
+        if (!errorQueue.isEmpty()) {
+            throw errorQueue.poll();
+        }
+    }

Review Comment:
   Cant we wrap this into a runtime exception or something? We don't really want the `throws Throwable` clause propagating up the stack.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -32,49 +31,73 @@
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Default implementation for leader election service. Composed with different {@link
  * LeaderElectionDriver}, we could perform a leader election for the contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link #start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link LeaderElectionDriver} if and only if the
+     * leadership is acquired by this service. {@code issuedLeaderSessionID} being {@code null}
+     * indicates that this service isn't the leader right now (i.e. {@link
+     * #onGrantLeadership(UUID)}) wasn't called, yet (independently of what {@code
+     * leaderElectionDriver#hasLeadership()} returns).
+     */
     @GuardedBy("lock")
     @Nullable
-    private volatile UUID issuedLeaderSessionID;
+    private UUID issuedLeaderSessionID;
 
+    /**
+     * Saves the leader information for a registered {@link LeaderContender} after this contender
+     * confirmed the leadership.
+     */
     @GuardedBy("lock")
-    private volatile LeaderInformation confirmedLeaderInformation;
+    private LeaderInformation confirmedLeaderInformation;
 
+    /**
+     * {@code leaderElectionDriver} being {@code null} indicates that the connection to the
+     * LeaderElection backend isn't established, yet. See {@link #startLeaderElectionBackend()} and
+     * {@link #close()} for lifecycle management. The lifecycle of the driver should have been
+     * established before registering a {@link LeaderContender} and stopped after the contender has
+     * been removed.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    private volatile boolean running;
+    private volatile LeaderElectionDriver leaderElectionDriver;
 
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private LeaderElectionDriver leaderElectionDriver;
-
-    private final ExecutorService leadershipOperationExecutor;
+    @Nullable private ExecutorService leadershipOperationExecutor;

Review Comment:
   Instead of making this nullable, how about checking `isShutdown`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -270,65 +390,73 @@ public void onLeaderInformationChange(LeaderInformation leaderInformation) {
         runInLeaderEventThread(() -> onLeaderInformationChangeInternal(leaderInformation));
     }
 
+    @GuardedBy("lock")
     private void onLeaderInformationChangeInternal(LeaderInformation leaderInformation) {
-        synchronized (lock) {
-            if (running) {
-                LOG.trace(
-                        "Leader node changed while {} is the leader with session ID {}. New leader information {}.",
-                        leaderContender.getDescription(),
-                        confirmedLeaderInformation.getLeaderSessionID(),
-                        leaderInformation);
-                if (!confirmedLeaderInformation.isEmpty()) {
-                    final LeaderInformation confirmedLeaderInfo = this.confirmedLeaderInformation;
-                    if (leaderInformation.isEmpty()) {
-                        LOG.debug(
-                                "Writing leader information by {} since the external storage is empty.",
-                                leaderContender.getDescription());
-                        leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
-                    } else if (!leaderInformation.equals(confirmedLeaderInfo)) {
-                        // the data field does not correspond to the expected leader information
-                        LOG.debug(
-                                "Correcting leader information by {}.",
-                                leaderContender.getDescription());
-                        leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
-                    }
+        if (leaderContender != null) {
+            LOG.trace(
+                    "Leader node changed while {} is the leader with {}. New leader information {}.",
+                    leaderContender.getDescription(),
+                    LeaderElectionUtils.convertToString(confirmedLeaderInformation),
+                    LeaderElectionUtils.convertToString(leaderInformation));
+            if (!confirmedLeaderInformation.isEmpty()) {
+                final LeaderInformation confirmedLeaderInfo = this.confirmedLeaderInformation;
+                if (leaderInformation.isEmpty()) {
+                    LOG.debug(
+                            "Writing leader information by {} since the external storage is empty.",
+                            leaderContender.getDescription());
+                    leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
+                } else if (!leaderInformation.equals(confirmedLeaderInfo)) {
+                    // the data field does not correspond to the expected leader information
+                    LOG.debug(
+                            "Correcting leader information by {}.",
+                            leaderContender.getDescription());
+                    leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
                 }
-            } else {
-                LOG.debug(
-                        "Ignoring change notification since the {} has already been closed.",
-                        leaderElectionDriver);
             }
+        } else {
+            LOG.debug(
+                    "Ignoring change notification since the {} has already been stopped.",
+                    leaderElectionDriver);
         }
     }
 
     private void runInLeaderEventThread(Runnable callback) {
-        if (running) {
+        if (leadershipOperationExecutor != null) {

Review Comment:
   Is this safe? This field isn't volatile, nor guarded by a lock.
   
   What prevents a call from crashing the JVM if close is called between this condition and runAsync?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169874448


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -70,22 +77,37 @@ public LeaderInformation getLeaderInformation() {
     }
 
     public void isLeader() {
+        isLeader(FutureUtils.completedVoidFuture());
+    }
+
+    public void isLeader(CompletableFuture<Void> grantLeadershipFuture) {
         synchronized (lock) {
             isLeader.set(true);
-            leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
+            grantLeadershipFuture.thenRun(
+                    () -> leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID()));

Review Comment:
   That's related to your [other comment](https://github.com/apache/flink/pull/22380#discussion_r1168771139). I'm inclined to halt this PR and create a separate ticket that moves the thread handling from `DefaultMultipleComponentLeaderElectionService` into the `DefaultLeaderElectionService`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169775001


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -69,6 +71,118 @@ void testOnGrantAndRevokeLeadership() throws Exception {
         };
     }
 
+    @Test
+    void testDelayedGrantCallAfterContenderRegistration() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            final CompletableFuture<Void> grantLeadershipFuture = new CompletableFuture<>();
+            driver.isLeader(grantLeadershipFuture);
+
+            final TestingContender contender = new TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            assertThat(testInstance.getLeaderSessionID())
+                    .as("Leadership grant was not forwarded to the contender, yet.")
+                    .isNull();
+
+            grantLeadershipFuture.complete(null);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * Test to cover the issue described in FLINK-31814. This test could be removed after
+     * FLINK-31814 is resolved.
+     */
+    @Test
+    void testOnRevokeCallWhileClosingService() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(
+                        LeaderElectionEventHandler::onRevokeLeadership);
+
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            driver.isLeader();
+
+            final TestingContender contender = new TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * This issue can happen when the shutdown of the contender takes too long and the leadership is
+     * re-acquired in the meantime (see FLINK-29234).

Review Comment:
   You're right: It doesn't really add much value. I added the test when noticing a test failure in [JobMasterServiceLeadershipRunnerTest.testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip](https://github.com/apache/flink/pull/21137/files#diff-c010bcd788571261ea6c43a17ad7aaea719a64a7b8a080d76939e7dbbc5a4b32). But the test scenario of `testStopWhileHavingLeadership` is actually more general and doesn't require additional JavaDoc pointing to FLINK-29234. I'm going to remove the JavaDoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169042058


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -70,22 +77,37 @@ public LeaderInformation getLeaderInformation() {
     }
 
     public void isLeader() {
+        isLeader(FutureUtils.completedVoidFuture());
+    }
+
+    public void isLeader(CompletableFuture<Void> grantLeadershipFuture) {
         synchronized (lock) {
             isLeader.set(true);
-            leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
+            grantLeadershipFuture.thenRun(
+                    () -> leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID()));

Review Comment:
   This is actually simulating what we're already using in `DefaultLeaderElectionService` in combination with the `MultipleComponentLeaderElectionDriver` implementations. They call `MultipleComponentLeaderElectionDriver.Listener.isLeader()` which triggers [DefaultMultipleComponentLeaderElectionService.isLeader()](https://github.com/apache/flink/blob/e3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L188). That method calls `onGrantLeadership` for each `LeaderElectionEventHandler` in DefaultMultipleComponentLeaderElectionService:202](https://github.com/apache/flink/blob/e3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L202). These calls will be executed in a separate executor in [DefaultMultipleComponentLeaderElectionService:225](https://github.com/apache/flink/blob/e
 3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L225).
   
   The order of the grant/revoke calls (which is the important bit) is ensured by the singleThread executor (see [DefaultMultipleComponentLeaderElectionService:98](https://github.com/apache/flink/blob/e3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L98). For test cases that use this feature, the test implementation has to ensure the right order of calls.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1534661805

   Rebased to most-recent version of FLINK-31838 (PR #22422)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1168903359


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
  * Default implementation for leader election service. Composed with different {@link
  * LeaderElectionDriver}, we could perform a leader election for the contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link #start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link LeaderElectionDriver} iff the leadership
+     * is acquired by this service. {@code issuedLeaderSessionID} being {@code null} indicates that
+     * this service isn't the leader right now (i.e. {@code
+     * leaderElectionDriver.hasLeadership(UUID)} would return {@code false} for any session ID.
+     */
     @GuardedBy("lock")
     @Nullable
-    private volatile UUID issuedLeaderSessionID;
+    private UUID issuedLeaderSessionID;
 
+    /**
+     * Saves the leader information for a registered {@link LeaderContender} after this contender
+     * confirmed the leadership.
+     */
     @GuardedBy("lock")
-    private volatile LeaderInformation confirmedLeaderInformation;
+    private LeaderInformation confirmedLeaderInformation;
 
+    /**
+     * {@code leaderElectionDriver} being {@code null} indicates that the connection to the
+     * LeaderElection backend isn't established, yet. See {@link #startLeaderElectionBackend()} and
+     * {@link #close()} for lifecycle management. The lifecycle of the driver should have been
+     * established before registering a {@link LeaderContender} and stopped after the contender has
+     * been removed.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    private volatile boolean running;
-
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private LeaderElectionDriver leaderElectionDriver;
+    private volatile LeaderElectionDriver leaderElectionDriver;

Review Comment:
   It seems that this field is guarded by `lock`, why does it still need to be volatile?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
  * Default implementation for leader election service. Composed with different {@link
  * LeaderElectionDriver}, we could perform a leader election for the contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link #start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link LeaderElectionDriver} iff the leadership
+     * is acquired by this service. {@code issuedLeaderSessionID} being {@code null} indicates that
+     * this service isn't the leader right now (i.e. {@code

Review Comment:
   Unpaired symbol: `(` seems to be missing



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -257,7 +360,7 @@ public void onLeaderInformationChange(LeaderInformation leaderInformation) {
                 }
             } else {
                 LOG.debug(
-                        "Ignoring change notification since the {} has " + "already been closed.",
+                        "Ignoring change notification since the {} has already been closed.",

Review Comment:
   ```suggestion
                           "Ignoring change notification since the {} has already been stopped.",
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -145,96 +222,122 @@ public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
                 } else {
                     LOG.warn(
                             "The leader session ID {} was confirmed even though the "
-                                    + "corresponding JobManager was not elected as the leader.",
+                                    + "corresponding service was not elected as the leader or has been stopped already.",
                             leaderSessionID);
                 }
             }
         }
     }
 
+    @GuardedBy("lock")
+    private boolean hasLeadership() {
+        return leaderElectionDriver.hasLeadership() && issuedLeaderSessionID != null;
+    }
+
     @Override
     public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
         synchronized (lock) {
-            if (running) {
-                return leaderElectionDriver.hasLeadership()
-                        && leaderSessionId.equals(issuedLeaderSessionID);
+            if (leaderElectionDriver != null) {
+                if (leaderContender != null) {
+                    return hasLeadership() && leaderSessionId.equals(issuedLeaderSessionID);
+                } else {
+                    LOG.debug(
+                            "hasLeadership is called after the LeaderContender was removed, returning false.");

Review Comment:
   nit: maybe `hasLeadership is called after the service is stopped, returning false.`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java:
##########
@@ -302,6 +303,8 @@ void testZooKeeperReelectionWithReplacement() throws Exception {
 
                     // stop leader election service = revoke leadership
                     leaderElectionService[index].stop();
+                    leaderElectionService[index].close();

Review Comment:
   Do we need to also close it in finally block?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -145,96 +222,122 @@ public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
                 } else {
                     LOG.warn(
                             "The leader session ID {} was confirmed even though the "
-                                    + "corresponding JobManager was not elected as the leader.",
+                                    + "corresponding service was not elected as the leader or has been stopped already.",
                             leaderSessionID);
                 }
             }
         }
     }
 
+    @GuardedBy("lock")
+    private boolean hasLeadership() {
+        return leaderElectionDriver.hasLeadership() && issuedLeaderSessionID != null;
+    }
+
     @Override
     public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
         synchronized (lock) {
-            if (running) {
-                return leaderElectionDriver.hasLeadership()
-                        && leaderSessionId.equals(issuedLeaderSessionID);
+            if (leaderElectionDriver != null) {
+                if (leaderContender != null) {
+                    return hasLeadership() && leaderSessionId.equals(issuedLeaderSessionID);
+                } else {
+                    LOG.debug(
+                            "hasLeadership is called after the LeaderContender was removed, returning false.");
+                    return false;
+                }
             } else {
-                LOG.debug("hasLeadership is called after the service is stopped, returning false.");
+                LOG.debug("hasLeadership is called after the service is closed, returning false.");
                 return false;
             }
         }
     }
 
-    /**
-     * Returns the current leader session ID or null, if the contender is not the leader.
-     *
-     * @return The last leader session ID or null, if the contender is not the leader
-     */
+    /** Returns the current leader session ID or {@code null}, if the session wasn't confirmed. */
     @VisibleForTesting
     @Nullable
     public UUID getLeaderSessionID() {
-        return confirmedLeaderInformation.getLeaderSessionID();
-    }
-
-    @GuardedBy("lock")
-    private void confirmLeaderInformation(UUID leaderSessionID, String leaderAddress) {
-        confirmedLeaderInformation = LeaderInformation.known(leaderSessionID, leaderAddress);
-        leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
+        synchronized (lock) {
+            return confirmedLeaderInformation.getLeaderSessionID();
+        }
     }
 
     @Override
     public void onGrantLeadership(UUID newLeaderSessionId) {
+        Preconditions.checkNotNull(newLeaderSessionId);
+
         synchronized (lock) {
-            if (running) {
-                issuedLeaderSessionID = newLeaderSessionId;
-                confirmedLeaderInformation = LeaderInformation.empty();
+            Preconditions.checkState(
+                    issuedLeaderSessionID == null,
+                    "The leadership should have been granted while not having the leadership acquired.");
 
-                LOG.debug(
-                        "Grant leadership to contender {} with session ID {}.",
-                        leaderContender.getDescription(),
-                        issuedLeaderSessionID);
+            issuedLeaderSessionID = newLeaderSessionId;
 
-                leaderContender.grantLeadership(issuedLeaderSessionID);
+            if (leaderContender != null) {
+                notifyLeaderContenderOfLeadership();
             } else {
                 LOG.debug(
-                        "Ignoring the grant leadership notification since the {} has already been closed.",
+                        "The grant leadership notification is not forwarded because the DefaultLeaderElectionService ({}) has no contender registered.",
                         leaderElectionDriver);
             }
         }
     }
 
+    @GuardedBy("lock")
+    private void notifyLeaderContenderOfLeadership() {
+        Preconditions.checkState(
+                confirmedLeaderInformation.isEmpty(),
+                "The leadership should have been granted while not having the leadership acquired.");
+
+        LOG.debug(
+                "Granting leadership to contender {} with session ID {}.",
+                leaderContender.getDescription(),
+                issuedLeaderSessionID);
+
+        leaderContender.grantLeadership(issuedLeaderSessionID);
+    }
+
     @Override
     public void onRevokeLeadership() {
         synchronized (lock) {
-            if (running) {
-                handleLeadershipLoss();
+            // TODO: FLINK-31814 covers adding this Precondition
+            // Preconditions.checkState(issuedLeaderSessionID != null,"The leadership should have
+            // been revoked while having the leadership acquired.");
+
+            final UUID previousSessionID = issuedLeaderSessionID;
+            issuedLeaderSessionID = null;
+
+            if (leaderContender != null) {
+                notifyLeaderContenderOfLeadershipLoss();
             } else {
                 LOG.debug(
-                        "Ignoring the revoke leadership notification since the {} "
-                                + "has already been closed.",
+                        "The revoke leadership for session {} notification is not forwarded because the DefaultLeaderElectionService({}) has no contender registered.",
+                        previousSessionID,
                         leaderElectionDriver);
             }
         }
     }
 
     @GuardedBy("lock")
-    private void handleLeadershipLoss() {
-        LOG.debug(
-                "Revoke leadership of {} ({}@{}).",
-                leaderContender.getDescription(),
-                confirmedLeaderInformation.getLeaderSessionID(),
-                confirmedLeaderInformation.getLeaderAddress());
+    private void notifyLeaderContenderOfLeadershipLoss() {
+        if (confirmedLeaderInformation.isEmpty()) {
+            LOG.debug(
+                    "Revoking leadership to contender {} while a previous leadership grant wasn't confirmed, yet.",
+                    leaderContender.getDescription());
+        } else {
+            LOG.debug(
+                    "Revoking leadership to contender {} for {}.",
+                    leaderContender.getDescription(),
+                    LeaderElectionUtils.convertToString(confirmedLeaderInformation));
+        }
 
-        issuedLeaderSessionID = null;
         confirmedLeaderInformation = LeaderInformation.empty();
-
         leaderContender.revokeLeadership();
     }
 
     @Override
     public void onLeaderInformationChange(LeaderInformation leaderInformation) {
         synchronized (lock) {
-            if (running) {
+            if (leaderContender != null) {
                 LOG.trace(
                         "Leader node changed while {} is the leader with session ID {}. New leader information {}.",

Review Comment:
   Maybe we can use `LeaderElectionUtils.convertToString(leaderInformation)` to replace `leaderInformation ` as the format param.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169800445


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
  * Default implementation for leader election service. Composed with different {@link
  * LeaderElectionDriver}, we could perform a leader election for the contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link #start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link LeaderElectionDriver} iff the leadership
+     * is acquired by this service. {@code issuedLeaderSessionID} being {@code null} indicates that
+     * this service isn't the leader right now (i.e. {@code
+     * leaderElectionDriver.hasLeadership(UUID)} would return {@code false} for any session ID.
+     */
     @GuardedBy("lock")
     @Nullable
-    private volatile UUID issuedLeaderSessionID;
+    private UUID issuedLeaderSessionID;
 
+    /**
+     * Saves the leader information for a registered {@link LeaderContender} after this contender
+     * confirmed the leadership.
+     */
     @GuardedBy("lock")
-    private volatile LeaderInformation confirmedLeaderInformation;
+    private LeaderInformation confirmedLeaderInformation;
 
+    /**
+     * {@code leaderElectionDriver} being {@code null} indicates that the connection to the
+     * LeaderElection backend isn't established, yet. See {@link #startLeaderElectionBackend()} and
+     * {@link #close()} for lifecycle management. The lifecycle of the driver should have been
+     * established before registering a {@link LeaderContender} and stopped after the contender has
+     * been removed.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    private volatile boolean running;
-
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private LeaderElectionDriver leaderElectionDriver;
+    private volatile LeaderElectionDriver leaderElectionDriver;

Review Comment:
   Because I missed it when I cleaned up the volatile keywords of all the fields. :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1549377590

   @reswqa anything to add from your side? I squashed the changes into a single commit now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169837248


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -145,96 +222,122 @@ public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
                 } else {
                     LOG.warn(
                             "The leader session ID {} was confirmed even though the "
-                                    + "corresponding JobManager was not elected as the leader.",
+                                    + "corresponding service was not elected as the leader or has been stopped already.",
                             leaderSessionID);
                 }
             }
         }
     }
 
+    @GuardedBy("lock")
+    private boolean hasLeadership() {
+        return leaderElectionDriver.hasLeadership() && issuedLeaderSessionID != null;
+    }
+
     @Override
     public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
         synchronized (lock) {
-            if (running) {
-                return leaderElectionDriver.hasLeadership()
-                        && leaderSessionId.equals(issuedLeaderSessionID);
+            if (leaderElectionDriver != null) {
+                if (leaderContender != null) {
+                    return hasLeadership() && leaderSessionId.equals(issuedLeaderSessionID);
+                } else {
+                    LOG.debug(
+                            "hasLeadership is called after the LeaderContender was removed, returning false.");

Review Comment:
   valid point. the log message survived the migration from the WIP branch to the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169834323


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -70,22 +77,37 @@ public LeaderInformation getLeaderInformation() {
     }
 
     public void isLeader() {
+        isLeader(FutureUtils.completedVoidFuture());
+    }
+
+    public void isLeader(CompletableFuture<Void> grantLeadershipFuture) {
         synchronized (lock) {
             isLeader.set(true);
-            leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
+            grantLeadershipFuture.thenRun(
+                    () -> leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID()));

Review Comment:
   If thread-safety must be handled by the LeaderElectionEventHandler, then shall we document it as such in the javadocs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169887689


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -70,22 +77,37 @@ public LeaderInformation getLeaderInformation() {
     }
 
     public void isLeader() {
+        isLeader(FutureUtils.completedVoidFuture());
+    }
+
+    public void isLeader(CompletableFuture<Void> grantLeadershipFuture) {
         synchronized (lock) {
             isLeader.set(true);
-            leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
+            grantLeadershipFuture.thenRun(
+                    () -> leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID()));

Review Comment:
   I created FLINK-31878 to cover this issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1193867705


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderBase.java:
##########
@@ -80,6 +84,20 @@ public Throwable getError() {
         return error == null ? errorQueue.poll() : error;
     }
 
+    /**
+     * Method for exposing errors that were caught during the test execution and need to be exposed
+     * within the test.
+     */
+    public void throwErrorIfPresent() throws Throwable {
+        if (error != null) {
+            throw error;
+        }
+
+        if (!errorQueue.isEmpty()) {
+            throw errorQueue.poll();
+        }
+    }

Review Comment:
   fair point. That reduces the diff for `DefaultLeaderElectionServiceTest`. :+1: I wrapped it into an `AssertionError` now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp merged pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp merged PR #22380:
URL: https://github.com/apache/flink/pull/22380


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169789210


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
  * Default implementation for leader election service. Composed with different {@link
  * LeaderElectionDriver}, we could perform a leader election for the contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link #start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link LeaderElectionDriver} iff the leadership

Review Comment:
   with "fix a type" you mean fix a typo? :thinking: The `iff` was actually intended (see [Wiktionary](https://en.wikipedia.org/wiki/If_and_only_if#:~:text=In%20logic%20and%20related%20fields,true%20or%20both%20are%20false.)). But it probably adds too much confusion: I'm gonna replace `iff`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169784060


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -69,6 +71,118 @@ void testOnGrantAndRevokeLeadership() throws Exception {
         };
     }
 
+    @Test
+    void testDelayedGrantCallAfterContenderRegistration() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            final CompletableFuture<Void> grantLeadershipFuture = new CompletableFuture<>();
+            driver.isLeader(grantLeadershipFuture);
+
+            final TestingContender contender = new TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            assertThat(testInstance.getLeaderSessionID())
+                    .as("Leadership grant was not forwarded to the contender, yet.")
+                    .isNull();
+
+            grantLeadershipFuture.complete(null);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+        }
+    }
+
+    /**
+     * Test to cover the issue described in FLINK-31814. This test could be removed after
+     * FLINK-31814 is resolved.
+     */
+    @Test
+    void testOnRevokeCallWhileClosingService() throws Exception {

Review Comment:
   I guess, the description still came from the WIP. I updated FLINK-31814's description to match the code base of this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169832741


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -145,96 +222,122 @@ public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
                 } else {
                     LOG.warn(
                             "The leader session ID {} was confirmed even though the "
-                                    + "corresponding JobManager was not elected as the leader.",
+                                    + "corresponding service was not elected as the leader or has been stopped already.",
                             leaderSessionID);
                 }
             }
         }
     }
 
+    @GuardedBy("lock")
+    private boolean hasLeadership() {
+        return leaderElectionDriver.hasLeadership() && issuedLeaderSessionID != null;
+    }
+
     @Override
     public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
         synchronized (lock) {
-            if (running) {
-                return leaderElectionDriver.hasLeadership()
-                        && leaderSessionId.equals(issuedLeaderSessionID);
+            if (leaderElectionDriver != null) {
+                if (leaderContender != null) {
+                    return hasLeadership() && leaderSessionId.equals(issuedLeaderSessionID);
+                } else {
+                    LOG.debug(
+                            "hasLeadership is called after the LeaderContender was removed, returning false.");
+                    return false;
+                }
             } else {
-                LOG.debug("hasLeadership is called after the service is stopped, returning false.");
+                LOG.debug("hasLeadership is called after the service is closed, returning false.");
                 return false;
             }
         }
     }
 
-    /**
-     * Returns the current leader session ID or null, if the contender is not the leader.
-     *
-     * @return The last leader session ID or null, if the contender is not the leader
-     */
+    /** Returns the current leader session ID or {@code null}, if the session wasn't confirmed. */
     @VisibleForTesting
     @Nullable
     public UUID getLeaderSessionID() {
-        return confirmedLeaderInformation.getLeaderSessionID();
-    }
-
-    @GuardedBy("lock")
-    private void confirmLeaderInformation(UUID leaderSessionID, String leaderAddress) {
-        confirmedLeaderInformation = LeaderInformation.known(leaderSessionID, leaderAddress);
-        leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
+        synchronized (lock) {
+            return confirmedLeaderInformation.getLeaderSessionID();
+        }
     }
 
     @Override
     public void onGrantLeadership(UUID newLeaderSessionId) {
+        Preconditions.checkNotNull(newLeaderSessionId);
+
         synchronized (lock) {
-            if (running) {
-                issuedLeaderSessionID = newLeaderSessionId;
-                confirmedLeaderInformation = LeaderInformation.empty();
+            Preconditions.checkState(
+                    issuedLeaderSessionID == null,
+                    "The leadership should have been granted while not having the leadership acquired.");
 
-                LOG.debug(
-                        "Grant leadership to contender {} with session ID {}.",
-                        leaderContender.getDescription(),
-                        issuedLeaderSessionID);
+            issuedLeaderSessionID = newLeaderSessionId;
 
-                leaderContender.grantLeadership(issuedLeaderSessionID);
+            if (leaderContender != null) {
+                notifyLeaderContenderOfLeadership();
             } else {
                 LOG.debug(
-                        "Ignoring the grant leadership notification since the {} has already been closed.",
+                        "The grant leadership notification is not forwarded because the DefaultLeaderElectionService ({}) has no contender registered.",
                         leaderElectionDriver);
             }
         }
     }
 
+    @GuardedBy("lock")
+    private void notifyLeaderContenderOfLeadership() {
+        Preconditions.checkState(
+                confirmedLeaderInformation.isEmpty(),
+                "The leadership should have been granted while not having the leadership acquired.");
+
+        LOG.debug(
+                "Granting leadership to contender {} with session ID {}.",
+                leaderContender.getDescription(),
+                issuedLeaderSessionID);
+
+        leaderContender.grantLeadership(issuedLeaderSessionID);
+    }
+
     @Override
     public void onRevokeLeadership() {
         synchronized (lock) {
-            if (running) {
-                handleLeadershipLoss();
+            // TODO: FLINK-31814 covers adding this Precondition
+            // Preconditions.checkState(issuedLeaderSessionID != null,"The leadership should have
+            // been revoked while having the leadership acquired.");
+
+            final UUID previousSessionID = issuedLeaderSessionID;
+            issuedLeaderSessionID = null;
+
+            if (leaderContender != null) {

Review Comment:
   No, it's possible that `onRevokeLeadership` is called without a contender being registered because the two `on*Leadership` methods are bound to the `leaderElectionDriver`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169834833


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
  * Default implementation for leader election service. Composed with different {@link
  * LeaderElectionDriver}, we could perform a leader election for the contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link #start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link LeaderElectionDriver} iff the leadership
+     * is acquired by this service. {@code issuedLeaderSessionID} being {@code null} indicates that
+     * this service isn't the leader right now (i.e. {@code
+     * leaderElectionDriver.hasLeadership(UUID)} would return {@code false} for any session ID.
+     */
     @GuardedBy("lock")
     @Nullable
-    private volatile UUID issuedLeaderSessionID;
+    private UUID issuedLeaderSessionID;
 
+    /**
+     * Saves the leader information for a registered {@link LeaderContender} after this contender
+     * confirmed the leadership.
+     */
     @GuardedBy("lock")
-    private volatile LeaderInformation confirmedLeaderInformation;
+    private LeaderInformation confirmedLeaderInformation;
 
+    /**
+     * {@code leaderElectionDriver} being {@code null} indicates that the connection to the
+     * LeaderElection backend isn't established, yet. See {@link #startLeaderElectionBackend()} and
+     * {@link #close()} for lifecycle management. The lifecycle of the driver should have been
+     * established before registering a {@link LeaderContender} and stopped after the contender has
+     * been removed.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    private volatile boolean running;
-
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private LeaderElectionDriver leaderElectionDriver;
+    private volatile LeaderElectionDriver leaderElectionDriver;

Review Comment:
   Yes, my mistake. Thanks for pointing it out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1542484791

   > Are we maybe missing some start call somewhere?
   > In one of the tests the job submission never went through, and another HA e2e test is also failing.
   
   Just to have this documented: The problem was (again) the fact that there's a grant call being triggered from within the  `MultipleComponentLeaderElectionDriverAdapter` while instantiating the driver. Initially, I only allowed runnables being pushed to the leader event thread if the driver is initialized. This prevented the initial grant call (if leadership was already acquired) to be handled properly.
   
   I'm gonna reorganize/rebase the branch and make it ready to be reviewed again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1549084955

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1538288405

   Yes, I'm still investigating the issue. I created a follow-up issue FLINK-32029 to cover an issue where we actually swallowed an Exception in the error handling.
   
   This PR is (again) work in progress until the test failures are resolved. I'm gonna ping you as soon as it's in a reviewable state.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1549810465

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1549085766

   The e2e test failed due to some (temporary) network connectivity issues within the Docker image (see [logs](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49011&view=logs&j=bea52777-eaf8-5663-8482-18fbc3630e81&t=43ba8ce7-ebbf-57cd-9163-444305d74117&l=10378))


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1550867431

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169826770


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -77,22 +100,48 @@ public DefaultLeaderElectionService(LeaderElectionDriverFactory leaderElectionDr
         this.leaderElectionDriver = null;
 
         this.confirmedLeaderInformation = LeaderInformation.empty();
+    }
 
-        this.running = false;
+    /**
+     * Starts the leader election process. This method has to be called before registering a {@link
+     * LeaderContender}.
+     */
+    public void startLeaderElectionBackend() throws Exception {
+        synchronized (lock) {
+            Preconditions.checkState(
+                    leaderContender == null,
+                    "No LeaderContender should have been registered, yet.");
+
+            leaderElectionDriver =
+                    leaderElectionDriverFactory.createLeaderElectionDriver(
+                            this, new LeaderElectionFatalErrorHandler());
+
+            LOG.info("Instantiating DefaultLeaderElectionService with {}.", leaderElectionDriver);
+        }
     }
 
     @Override
     public final void start(LeaderContender contender) throws Exception {
         checkNotNull(contender, "Contender must not be null.");
-        Preconditions.checkState(leaderContender == null, "Contender was already set.");
 
         synchronized (lock) {
-            running = true;
+            Preconditions.checkState(
+                    leaderContender == null,
+                    "Only one LeaderContender is allowed to be registered to this service.");
+            Preconditions.checkState(
+                    leaderElectionDriver != null,
+                    "The DefaultLeaderElectionService should have established a connection to the backend before it's started.");
+
             leaderContender = contender;
-            leaderElectionDriver =
-                    leaderElectionDriverFactory.createLeaderElectionDriver(
-                            this, new LeaderElectionFatalErrorHandler());
-            LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);
+
+            LOG.info(
+                    "LeaderContender {} has been registered for {}.",
+                    contender.getDescription(),
+                    leaderElectionDriver);
+
+            if (hasLeadership()) {
+                notifyLeaderContenderOfLeadership();

Review Comment:
   args, that's a good catch - I haven't thought about it in that way. I was planning to include a `singleThreadExecutor` similarly to what we have in `DefaultMultipleComponentLeaderElectionService` but hoped that it could be done in a separate PR. But it looks like I have to integrate that one in here as well. :roll_eyes: I'm going to work on it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169887689


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -70,22 +77,37 @@ public LeaderInformation getLeaderInformation() {
     }
 
     public void isLeader() {
+        isLeader(FutureUtils.completedVoidFuture());
+    }
+
+    public void isLeader(CompletableFuture<Void> grantLeadershipFuture) {
         synchronized (lock) {
             isLeader.set(true);
-            leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
+            grantLeadershipFuture.thenRun(
+                    () -> leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID()));

Review Comment:
   I created FLINK-31838 to cover this issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169023770


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java:
##########
@@ -772,7 +773,11 @@ void testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip
             closeAsyncCalledTrigger.await();
 
             final CheckedThread grantLeadershipThread =
-                    createCheckedThread(currentLeaderDriver::isLeader);
+                    createCheckedThread(
+                            () -> {
+                                currentLeaderDriver.notLeader();
+                                currentLeaderDriver.isLeader();

Review Comment:
   hmpf, I remember that I put it there because of some Precondition I have added to the `DefaultLeaderElectionService` to ensure that leadership can only be granted when the leadership wasn't acquired before. I'm pretty sure that it was related to the FLINK-31814 issue. :thinking: It's annoying that I cannot reproduce it anymore. Not sure what bug I had still in the `DefaultLeaderElectionService` which I tried to work around with this change. Anyway, you're right: This change is not necessary. I'm gonna revert it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169042058


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -70,22 +77,37 @@ public LeaderInformation getLeaderInformation() {
     }
 
     public void isLeader() {
+        isLeader(FutureUtils.completedVoidFuture());
+    }
+
+    public void isLeader(CompletableFuture<Void> grantLeadershipFuture) {
         synchronized (lock) {
             isLeader.set(true);
-            leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
+            grantLeadershipFuture.thenRun(
+                    () -> leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID()));

Review Comment:
   This is actually simulating what we're already using in `DefaultLeaderElectionService` in combination with the `MultipleComponentLeaderElectionDriver` implementations. They call `MultipleComponentLeaderElectionDriver.Listener.isLeader()` which triggers [DefaultMultipleComponentLeaderElectionService.isLeader()](https://github.com/apache/flink/blob/e3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L188). That method calls `onGrantLeadership` for each `LeaderElectionEventHandler` in [DefaultMultipleComponentLeaderElectionService:202](https://github.com/apache/flink/blob/e3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L202). These calls will be executed in a separate executor in [DefaultMultipleComponentLeaderElectionService:225](https://github.com/apache/flink/blob/
 e3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L225).
   
   The order of the grant/revoke calls (which is the important bit) is ensured by the singleThread executor (see [DefaultMultipleComponentLeaderElectionService:98](https://github.com/apache/flink/blob/e3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L98). For test cases that use this feature, the test implementation has to ensure the right order of calls.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org