You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/30 21:02:12 UTC

[GitHub] [pulsar] merlimat opened a new pull request #10457: PIP-45: Revalidate leader election after session is recovered

merlimat opened a new pull request #10457:
URL: https://github.com/apache/pulsar/pull/10457


   ### Motivation
   
   The leader election component should revalidate the leadership ephemeral node once the session is re-established. 
   
   ### Modifications
   
    1. When acquiring leadership, check if the node is a left over from an older session or if it belongs to a different instance
    2. Re-trigger leader election on session re-establishment. If that fails, notify the listener that the mode has changed into Follower.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat merged pull request #10457: PIP-45: Revalidate leader election after session is recovered

Posted by GitBox <gi...@apache.org>.
merlimat merged pull request #10457:
URL: https://github.com/apache/pulsar/pull/10457


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #10457: PIP-45: Revalidate leader election after session is recovered

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #10457:
URL: https://github.com/apache/pulsar/pull/10457#discussion_r625337814



##########
File path: pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
##########
@@ -233,8 +268,19 @@ public synchronized LeaderElectionState getState() {
         return cache.getIfCached(path);
     }
 
-    @Override
-    public void accept(Notification notification) {
+    private synchronized void handleSessionNotification(SessionEvent event) {
+        if (event == SessionEvent.SessionReestablished) {
+            if (leaderElectionState == LeaderElectionState.Leading) {
+                log.info("Revalidating leadership for {}", path);
+            }
+
+            elect().thenAccept(lse -> {
+                log.info("Resynced leadership for {} - State: ", path, lse);

Review comment:
       missing `{}`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #10457: PIP-45: Revalidate leader election after session is recovered

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #10457:
URL: https://github.com/apache/pulsar/pull/10457#discussion_r625340218



##########
File path: pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
##########
@@ -96,26 +99,58 @@
     private synchronized CompletableFuture<LeaderElectionState> elect() {
         // First check if there's already a leader elected
         internalState = InternalState.ElectionInProgress;
-        return cache.get(path).thenCompose(optLock -> {
+        return store.get(path).thenCompose(optLock -> {
             if (optLock.isPresent()) {
-                synchronized (LeaderElectionImpl.this) {
-                    internalState = InternalState.LeaderIsPresent;
-                    if (leaderElectionState != LeaderElectionState.Following) {
-                        leaderElectionState = LeaderElectionState.Following;
-                        try {
-                            stateChangesListener.accept(leaderElectionState);
-                        } catch (Throwable t) {
-                            log.warn("Exception in state change listener", t);
-                        }
-                    }
-                    return CompletableFuture.completedFuture(leaderElectionState);
-                }
+                return handleExistingLeaderValue(optLock.get());
             } else {
                 return tryToBecomeLeader();
             }
         });
     }
 
+    private synchronized CompletableFuture<LeaderElectionState> handleExistingLeaderValue(GetResult res) {
+        T existingValue;
+        try {
+            existingValue = serde.deserialize(res.getValue());
+        } catch (Throwable t) {
+            return FutureUtils.exception(t);
+        }
+
+        if (existingValue.equals(proposedValue.orElse(null))) {
+            // If the value is the same as our proposed value, it means this instance was the leader at some
+            // point before. The existing value can either be for this same session or for a previous one.
+            if (res.getStat().isCreatedBySelf()) {
+                // The value is still valid because it was created in the same session
+                changeState(LeaderElectionState.Leading);
+            } else {
+                // Since the value was created in a different session, it might be expiring. We need to delete it
+                // and try the election again.
+                return store.delete(path, Optional.of(res.getStat().getVersion()))
+                        .thenCompose(__ -> tryToBecomeLeader());
+            }
+        } else if (res.getStat().isCreatedBySelf()) {
+            // The existing value is different but was created from the same session
+            return store.delete(path, Optional.of(res.getStat().getVersion()))
+                    .thenCompose(__ -> tryToBecomeLeader());
+        }
+
+        // If the existing value is different, it means there's already another leader
+        changeState(LeaderElectionState.Following);
+        return CompletableFuture.completedFuture(LeaderElectionState.Following);
+    }
+
+    private synchronized void changeState(LeaderElectionState lse) {

Review comment:
       nitpicking: variable name: `les`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org