You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/02/12 23:44:41 UTC
[ratis] branch master updated: RATIS-1779. Refactor: Reduce nesting in TransferLeadership (#820)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 18eacaed3 RATIS-1779. Refactor: Reduce nesting in TransferLeadership (#820)
18eacaed3 is described below
commit 18eacaed31e4965a9c400d86409a88fea21fc18a
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Mon Feb 13 07:44:36 2023 +0800
RATIS-1779. Refactor: Reduce nesting in TransferLeadership (#820)
---
.../ratis/server/impl/TransferLeadership.java | 50 ++++++++++++----------
1 file changed, 27 insertions(+), 23 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
index 9fe9081f8..c5c1a46cb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
@@ -94,13 +94,12 @@ public class TransferLeadership {
void onFollowerAppendEntriesReply(LeaderStateImpl leaderState, FollowerInfo follower) {
final Optional<RaftPeerId> transferee = getTransferee();
- // If TransferLeadership is in progress, and the transferee has just append some entries
- if (transferee.filter(t -> t.equals(follower.getId())).isPresent()) {
- // If the transferee is up-to-date, send StartLeaderElection to it
- if (leaderState.sendStartLeaderElection(follower)) {
- LOG.info("{}: sent StartLeaderElection to transferee {} after received AppendEntriesResponse",
- server.getMemberId(), transferee.get());
- }
+ // If the transferee has just append some entries and becomes up-to-date,
+ // send StartLeaderElection to it
+ if (transferee.filter(t -> t.equals(follower.getId())).isPresent()
+ && leaderState.sendStartLeaderElection(follower)) {
+ LOG.info("{}: sent StartLeaderElection to transferee {} after received AppendEntriesResponse",
+ server.getMemberId(), transferee.get());
}
}
@@ -127,22 +126,7 @@ public class TransferLeadership {
final MemoizedSupplier<PendingRequest> supplier = JavaUtils.memoize(() -> new PendingRequest(request));
final PendingRequest previous = pending.getAndUpdate(f -> f != null? f: supplier.get());
if (previous != null) {
- if (request.getNewLeader().equals(previous.getRequest().getNewLeader())) {
- final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
- previous.getReplyFuture().whenComplete((r, e) -> {
- if (e != null) {
- replyFuture.completeExceptionally(e);
- } else {
- replyFuture.complete(r.isSuccess()? server.newSuccessReply(request)
- : server.newExceptionReply(request, r.getException()));
- }
- });
- return replyFuture;
- } else {
- final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId() +
- "Failed to transfer leadership to " + request.getNewLeader() + ": a previous " + previous + " exists");
- return CompletableFuture.completedFuture(server.newExceptionReply(request, tle));
- }
+ return createReplyFutureFromPreviousRequest(request, previous);
}
tryTransferLeadership(leaderState, request.getNewLeader());
@@ -154,6 +138,26 @@ public class TransferLeadership {
return supplier.get().getReplyFuture();
}
+ private CompletableFuture<RaftClientReply> createReplyFutureFromPreviousRequest(
+ TransferLeadershipRequest request, PendingRequest previous) {
+ if (request.getNewLeader().equals(previous.getRequest().getNewLeader())) {
+ final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
+ previous.getReplyFuture().whenComplete((r, e) -> {
+ if (e != null) {
+ replyFuture.completeExceptionally(e);
+ } else {
+ replyFuture.complete(r.isSuccess() ? server.newSuccessReply(request)
+ : server.newExceptionReply(request, r.getException()));
+ }
+ });
+ return replyFuture;
+ } else {
+ final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId() +
+ "Failed to transfer leadership to " + request.getNewLeader() + ": a previous " + previous + " exists");
+ return CompletableFuture.completedFuture(server.newExceptionReply(request, tle));
+ }
+ }
+
void finish(RaftPeerId currentLeader, boolean timeout) {
Optional.ofNullable(pending.getAndSet(null))
.ifPresent(r -> r.complete(currentLeader, timeout));