You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "reswqa (via GitHub)" <gi...@apache.org> on 2023/04/17 16:34:56 UTC

[GitHub] [flink] reswqa commented on a diff in pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and registering a contender

reswqa commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1168903359


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
  * Default implementation for leader election service. Composed with different {@link
  * LeaderElectionDriver}, we could perform a leader election for the contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link #start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link LeaderElectionDriver} iff the leadership
+     * is acquired by this service. {@code issuedLeaderSessionID} being {@code null} indicates that
+     * this service isn't the leader right now (i.e. {@code
+     * leaderElectionDriver.hasLeadership(UUID)} would return {@code false} for any session ID.
+     */
     @GuardedBy("lock")
     @Nullable
-    private volatile UUID issuedLeaderSessionID;
+    private UUID issuedLeaderSessionID;
 
+    /**
+     * Saves the leader information for a registered {@link LeaderContender} after this contender
+     * confirmed the leadership.
+     */
     @GuardedBy("lock")
-    private volatile LeaderInformation confirmedLeaderInformation;
+    private LeaderInformation confirmedLeaderInformation;
 
+    /**
+     * {@code leaderElectionDriver} being {@code null} indicates that the connection to the
+     * LeaderElection backend isn't established, yet. See {@link #startLeaderElectionBackend()} and
+     * {@link #close()} for lifecycle management. The lifecycle of the driver should have been
+     * established before registering a {@link LeaderContender} and stopped after the contender has
+     * been removed.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    private volatile boolean running;
-
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private LeaderElectionDriver leaderElectionDriver;
+    private volatile LeaderElectionDriver leaderElectionDriver;

Review Comment:
   It seems that this field is guarded by `lock`, why does it still need to be volatile?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
  * Default implementation for leader election service. Composed with different {@link
  * LeaderElectionDriver}, we could perform a leader election for the contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link #start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link LeaderElectionDriver} iff the leadership
+     * is acquired by this service. {@code issuedLeaderSessionID} being {@code null} indicates that
+     * this service isn't the leader right now (i.e. {@code

Review Comment:
   Unpaired symbol: `(` seems to be missing



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -257,7 +360,7 @@ public void onLeaderInformationChange(LeaderInformation leaderInformation) {
                 }
             } else {
                 LOG.debug(
-                        "Ignoring change notification since the {} has " + "already been closed.",
+                        "Ignoring change notification since the {} has already been closed.",

Review Comment:
   ```suggestion
                           "Ignoring change notification since the {} has already been stopped.",
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -145,96 +222,122 @@ public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
                 } else {
                     LOG.warn(
                             "The leader session ID {} was confirmed even though the "
-                                    + "corresponding JobManager was not elected as the leader.",
+                                    + "corresponding service was not elected as the leader or has been stopped already.",
                             leaderSessionID);
                 }
             }
         }
     }
 
+    @GuardedBy("lock")
+    private boolean hasLeadership() {
+        return leaderElectionDriver.hasLeadership() && issuedLeaderSessionID != null;
+    }
+
     @Override
     public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
         synchronized (lock) {
-            if (running) {
-                return leaderElectionDriver.hasLeadership()
-                        && leaderSessionId.equals(issuedLeaderSessionID);
+            if (leaderElectionDriver != null) {
+                if (leaderContender != null) {
+                    return hasLeadership() && leaderSessionId.equals(issuedLeaderSessionID);
+                } else {
+                    LOG.debug(
+                            "hasLeadership is called after the LeaderContender was removed, returning false.");

Review Comment:
   nit: maybe `hasLeadership is called after the service is stopped, returning false.`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java:
##########
@@ -302,6 +303,8 @@ void testZooKeeperReelectionWithReplacement() throws Exception {
 
                     // stop leader election service = revoke leadership
                     leaderElectionService[index].stop();
+                    leaderElectionService[index].close();

Review Comment:
   Do we need to also close it in finally block?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -145,96 +222,122 @@ public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
                 } else {
                     LOG.warn(
                             "The leader session ID {} was confirmed even though the "
-                                    + "corresponding JobManager was not elected as the leader.",
+                                    + "corresponding service was not elected as the leader or has been stopped already.",
                             leaderSessionID);
                 }
             }
         }
     }
 
+    @GuardedBy("lock")
+    private boolean hasLeadership() {
+        return leaderElectionDriver.hasLeadership() && issuedLeaderSessionID != null;
+    }
+
     @Override
     public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
         synchronized (lock) {
-            if (running) {
-                return leaderElectionDriver.hasLeadership()
-                        && leaderSessionId.equals(issuedLeaderSessionID);
+            if (leaderElectionDriver != null) {
+                if (leaderContender != null) {
+                    return hasLeadership() && leaderSessionId.equals(issuedLeaderSessionID);
+                } else {
+                    LOG.debug(
+                            "hasLeadership is called after the LeaderContender was removed, returning false.");
+                    return false;
+                }
             } else {
-                LOG.debug("hasLeadership is called after the service is stopped, returning false.");
+                LOG.debug("hasLeadership is called after the service is closed, returning false.");
                 return false;
             }
         }
     }
 
-    /**
-     * Returns the current leader session ID or null, if the contender is not the leader.
-     *
-     * @return The last leader session ID or null, if the contender is not the leader
-     */
+    /** Returns the current leader session ID or {@code null}, if the session wasn't confirmed. */
     @VisibleForTesting
     @Nullable
     public UUID getLeaderSessionID() {
-        return confirmedLeaderInformation.getLeaderSessionID();
-    }
-
-    @GuardedBy("lock")
-    private void confirmLeaderInformation(UUID leaderSessionID, String leaderAddress) {
-        confirmedLeaderInformation = LeaderInformation.known(leaderSessionID, leaderAddress);
-        leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
+        synchronized (lock) {
+            return confirmedLeaderInformation.getLeaderSessionID();
+        }
     }
 
     @Override
     public void onGrantLeadership(UUID newLeaderSessionId) {
+        Preconditions.checkNotNull(newLeaderSessionId);
+
         synchronized (lock) {
-            if (running) {
-                issuedLeaderSessionID = newLeaderSessionId;
-                confirmedLeaderInformation = LeaderInformation.empty();
+            Preconditions.checkState(
+                    issuedLeaderSessionID == null,
+                    "The leadership should have been granted while not having the leadership acquired.");
 
-                LOG.debug(
-                        "Grant leadership to contender {} with session ID {}.",
-                        leaderContender.getDescription(),
-                        issuedLeaderSessionID);
+            issuedLeaderSessionID = newLeaderSessionId;
 
-                leaderContender.grantLeadership(issuedLeaderSessionID);
+            if (leaderContender != null) {
+                notifyLeaderContenderOfLeadership();
             } else {
                 LOG.debug(
-                        "Ignoring the grant leadership notification since the {} has already been closed.",
+                        "The grant leadership notification is not forwarded because the DefaultLeaderElectionService ({}) has no contender registered.",
                         leaderElectionDriver);
             }
         }
     }
 
+    @GuardedBy("lock")
+    private void notifyLeaderContenderOfLeadership() {
+        Preconditions.checkState(
+                confirmedLeaderInformation.isEmpty(),
+                "The leadership should have been granted while not having the leadership acquired.");
+
+        LOG.debug(
+                "Granting leadership to contender {} with session ID {}.",
+                leaderContender.getDescription(),
+                issuedLeaderSessionID);
+
+        leaderContender.grantLeadership(issuedLeaderSessionID);
+    }
+
     @Override
     public void onRevokeLeadership() {
         synchronized (lock) {
-            if (running) {
-                handleLeadershipLoss();
+            // TODO: FLINK-31814 covers adding this Precondition
+            // Preconditions.checkState(issuedLeaderSessionID != null,"The leadership should have
+            // been revoked while having the leadership acquired.");
+
+            final UUID previousSessionID = issuedLeaderSessionID;
+            issuedLeaderSessionID = null;
+
+            if (leaderContender != null) {
+                notifyLeaderContenderOfLeadershipLoss();
             } else {
                 LOG.debug(
-                        "Ignoring the revoke leadership notification since the {} "
-                                + "has already been closed.",
+                        "The revoke leadership for session {} notification is not forwarded because the DefaultLeaderElectionService({}) has no contender registered.",
+                        previousSessionID,
                         leaderElectionDriver);
             }
         }
     }
 
     @GuardedBy("lock")
-    private void handleLeadershipLoss() {
-        LOG.debug(
-                "Revoke leadership of {} ({}@{}).",
-                leaderContender.getDescription(),
-                confirmedLeaderInformation.getLeaderSessionID(),
-                confirmedLeaderInformation.getLeaderAddress());
+    private void notifyLeaderContenderOfLeadershipLoss() {
+        if (confirmedLeaderInformation.isEmpty()) {
+            LOG.debug(
+                    "Revoking leadership to contender {} while a previous leadership grant wasn't confirmed, yet.",
+                    leaderContender.getDescription());
+        } else {
+            LOG.debug(
+                    "Revoking leadership to contender {} for {}.",
+                    leaderContender.getDescription(),
+                    LeaderElectionUtils.convertToString(confirmedLeaderInformation));
+        }
 
-        issuedLeaderSessionID = null;
         confirmedLeaderInformation = LeaderInformation.empty();
-
         leaderContender.revokeLeadership();
     }
 
     @Override
     public void onLeaderInformationChange(LeaderInformation leaderInformation) {
         synchronized (lock) {
-            if (running) {
+            if (leaderContender != null) {
                 LOG.trace(
                         "Leader node changed while {} is the leader with session ID {}. New leader information {}.",

Review Comment:
   Maybe we can use `LeaderElectionUtils.convertToString(leaderInformation)` to replace `leaderInformation ` as the format param.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org