You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/12/23 09:29:07 UTC

[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #372: RATIS-1260. Implement transferLeaderShip in server

szetszwo commented on a change in pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#discussion_r547844071



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
##########
@@ -145,6 +145,17 @@ boolean containsInConf(RaftPeerId peerId) {
     return conf.contains(peerId);
   }
 
+  boolean isHighestPriority(RaftPeerId peerId) {
+    RaftPeer target = getPeer(peerId);
+    Collection<RaftPeer> peers = getCurrentPeers();
+    for (RaftPeer peer : peers) {
+      if (peer.getPriority() >= target.getPriority() && !peer.equals(target)) {

Review comment:
       Should it be `>`?

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -866,18 +880,112 @@ RaftClientReply waitForReply(RaftClientRequest request, CompletableFuture<RaftCl
 
   @Override
   public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    return waitForReply(request, transferLeadershipAsync(request));
+  }
+
+  private CompletableFuture<RaftClientReply> logAndReturnTransferLeadershipFail(
+      TransferLeadershipRequest request, String msg) {
+    LOG.warn(msg);
+    return CompletableFuture.completedFuture(newExceptionReply(request, new StateMachineException(msg)));
+  }
+
+  boolean isSteppingDown() {
+    return finishTransferLeader != null;
+  }
+
+  private Consumer<RaftPeerId> finishTransferLeader;
+
+  public Consumer<RaftPeerId> finishTransferLeader() {
+    return finishTransferLeader;
+  }
+
+  public Consumer<RaftPeerId> setFinishTransferLeader(Consumer<RaftPeerId> consumer) {
+    return finishTransferLeader = consumer;
+  }
+
+  private void timeoutTransferLeadership(
+      TransferLeadershipRequest request, CompletableFuture<RaftClientReply> replyFuture)
+      throws StateMachineException {
+    synchronized (replyFuture) {

Review comment:
       It is better to use `AtomicReference` instead of `synchronized`.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -866,18 +880,112 @@ RaftClientReply waitForReply(RaftClientRequest request, CompletableFuture<RaftCl
 
   @Override
   public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    return waitForReply(request, transferLeadershipAsync(request));
+  }
+
+  private CompletableFuture<RaftClientReply> logAndReturnTransferLeadershipFail(
+      TransferLeadershipRequest request, String msg) {
+    LOG.warn(msg);
+    return CompletableFuture.completedFuture(newExceptionReply(request, new StateMachineException(msg)));

Review comment:
       We should add a TransferLeadershipException.  StateMachineException should come from StateMachine.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -866,18 +880,112 @@ RaftClientReply waitForReply(RaftClientRequest request, CompletableFuture<RaftCl
 
   @Override
   public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    return waitForReply(request, transferLeadershipAsync(request));
+  }
+
+  private CompletableFuture<RaftClientReply> logAndReturnTransferLeadershipFail(
+      TransferLeadershipRequest request, String msg) {
+    LOG.warn(msg);
+    return CompletableFuture.completedFuture(newExceptionReply(request, new StateMachineException(msg)));
+  }
+
+  boolean isSteppingDown() {
+    return finishTransferLeader != null;
+  }
+
+  private Consumer<RaftPeerId> finishTransferLeader;
+
+  public Consumer<RaftPeerId> finishTransferLeader() {
+    return finishTransferLeader;
+  }
+
+  public Consumer<RaftPeerId> setFinishTransferLeader(Consumer<RaftPeerId> consumer) {
+    return finishTransferLeader = consumer;
+  }
+
+  private void timeoutTransferLeadership(
+      TransferLeadershipRequest request, CompletableFuture<RaftClientReply> replyFuture)
+      throws StateMachineException {
+    synchronized (replyFuture) {
+      if (replyFuture.isDone()) {
+        return;
+      }
+
+      setFinishTransferLeader(null);
+
+      if (state.getLeaderId().equals(request.getNewLeader())) {
+        replyFuture.complete(newSuccessReply(request));
+      } else {
+        StateMachineException sme = new StateMachineException("Failed to transfer leadership");
+        replyFuture.complete(newExceptionReply(request, sme));
+        throw sme;
+      }
+    }
   }
 
   @Override
   public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request)
       throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    LOG.info("{}: receive transferLeadership {}", getMemberId(), request);
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+    assertGroup(request.getRequestorId(), request.getRaftGroupId());
+
+    synchronized (this) {
+      CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, false);
+      if (reply != null) {
+        return reply;
+      }
+
+      if (getId().equals(request.getNewLeader())) {
+        return CompletableFuture.completedFuture(newSuccessReply(request));
+      }
+
+      final RaftConfigurationImpl conf = getRaftConf();
+      final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
+
+      // make sure there is no raft reconfiguration in progress
+      if (!conf.isStable() || leaderState.inStagingState() || !state.isConfCommitted()) {
+        String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
+            " when raft reconfiguration in progress.";
+        return logAndReturnTransferLeadershipFail(request, msg);
+      }
+
+      if (!conf.containsInConf(request.getNewLeader())) {
+        String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
+            " as it is not in " + conf;
+        return logAndReturnTransferLeadershipFail(request, msg);
+      }
+
+      if (!conf.isHighestPriority(request.getNewLeader())) {
+        String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
+            " as it does not has highest priority " + conf;
+        return logAndReturnTransferLeadershipFail(request, msg);
+      }
+
+      CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
+
+      setFinishTransferLeader(currLeader -> {
+        synchronized (replyFuture) {
+          if (currLeader == null || replyFuture.isDone()) {
+            return;
+          }
+
+          if (currLeader.equals(request.getNewLeader())) {
+            replyFuture.complete(newSuccessReply(request));
+            setFinishTransferLeader(null);
+          }
+        }
+      });
+
+      scheduler.onTimeout(TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS),
+          () -> timeoutTransferLeadership(request, replyFuture),
+          LOG, () -> "Timeout check failed for append entry request: " + request);
+
+      return replyFuture;

Review comment:
       This part can be moved to TransferLeadership.start(TransferLeadershipRequest).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org