You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/02/12 23:44:41 UTC

[ratis] branch master updated: RATIS-1779. Refactor: Reduce nesting in TransferLeadership (#820)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 18eacaed3 RATIS-1779. Refactor: Reduce nesting in TransferLeadership (#820)
18eacaed3 is described below

commit 18eacaed31e4965a9c400d86409a88fea21fc18a
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Mon Feb 13 07:44:36 2023 +0800

    RATIS-1779. Refactor: Reduce nesting in TransferLeadership (#820)
---
 .../ratis/server/impl/TransferLeadership.java      | 50 ++++++++++++----------
 1 file changed, 27 insertions(+), 23 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
index 9fe9081f8..c5c1a46cb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
@@ -94,13 +94,12 @@ public class TransferLeadership {
 
   void onFollowerAppendEntriesReply(LeaderStateImpl leaderState, FollowerInfo follower) {
     final Optional<RaftPeerId> transferee = getTransferee();
-    // If TransferLeadership is in progress, and the transferee has just append some entries
-    if (transferee.filter(t -> t.equals(follower.getId())).isPresent()) {
-      // If the transferee is up-to-date, send StartLeaderElection to it
-      if (leaderState.sendStartLeaderElection(follower)) {
-        LOG.info("{}: sent StartLeaderElection to transferee {} after received AppendEntriesResponse",
-            server.getMemberId(), transferee.get());
-      }
+    // If the transferee has just append some entries and becomes up-to-date,
+    // send StartLeaderElection to it
+    if (transferee.filter(t -> t.equals(follower.getId())).isPresent()
+        && leaderState.sendStartLeaderElection(follower)) {
+      LOG.info("{}: sent StartLeaderElection to transferee {} after received AppendEntriesResponse",
+          server.getMemberId(), transferee.get());
     }
   }
 
@@ -127,22 +126,7 @@ public class TransferLeadership {
     final MemoizedSupplier<PendingRequest> supplier = JavaUtils.memoize(() -> new PendingRequest(request));
     final PendingRequest previous = pending.getAndUpdate(f -> f != null? f: supplier.get());
     if (previous != null) {
-      if (request.getNewLeader().equals(previous.getRequest().getNewLeader())) {
-        final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
-        previous.getReplyFuture().whenComplete((r, e) -> {
-          if (e != null) {
-            replyFuture.completeExceptionally(e);
-          } else {
-            replyFuture.complete(r.isSuccess()? server.newSuccessReply(request)
-                : server.newExceptionReply(request, r.getException()));
-          }
-        });
-        return replyFuture;
-      } else {
-        final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId() +
-            "Failed to transfer leadership to " + request.getNewLeader() + ": a previous " + previous + " exists");
-        return CompletableFuture.completedFuture(server.newExceptionReply(request, tle));
-      }
+      return createReplyFutureFromPreviousRequest(request, previous);
     }
     tryTransferLeadership(leaderState, request.getNewLeader());
 
@@ -154,6 +138,26 @@ public class TransferLeadership {
     return supplier.get().getReplyFuture();
   }
 
+  private CompletableFuture<RaftClientReply> createReplyFutureFromPreviousRequest(
+      TransferLeadershipRequest request, PendingRequest previous) {
+    if (request.getNewLeader().equals(previous.getRequest().getNewLeader())) {
+      final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
+      previous.getReplyFuture().whenComplete((r, e) -> {
+        if (e != null) {
+          replyFuture.completeExceptionally(e);
+        } else {
+          replyFuture.complete(r.isSuccess() ? server.newSuccessReply(request)
+              : server.newExceptionReply(request, r.getException()));
+        }
+      });
+      return replyFuture;
+    } else {
+      final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId() +
+          "Failed to transfer leadership to " + request.getNewLeader() + ": a previous " + previous + " exists");
+      return CompletableFuture.completedFuture(server.newExceptionReply(request, tle));
+    }
+  }
+
   void finish(RaftPeerId currentLeader, boolean timeout) {
     Optional.ofNullable(pending.getAndSet(null))
         .ifPresent(r -> r.complete(currentLeader, timeout));