You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/12/23 01:50:45 UTC

[GitHub] [incubator-ratis] runzhiwang opened a new pull request #372: RATIS-1260. Implement transferLeaderShip in server

runzhiwang opened a new pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372


   ## What changes were proposed in this pull request?
   
   Implement transferLeaderShip in server
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/RATIS-1260
   
   ## How was this patch tested?
   
   TODO
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang edited a comment on pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
runzhiwang edited a comment on pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#issuecomment-750245608


   > Let's add new TransferLeadership class and put all the code there.
   
   @szetszwo Do I need to put the following check code in TransferLeadership class ? if so, I need to change some method from private to publibc, such as `checkLeaderState`
        ```
        CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, false);
         if (reply != null) {
           return reply;
         }
   
         if (getId().equals(request.getNewLeader())) {
           return CompletableFuture.completedFuture(newSuccessReply(request));
         }
   
         final RaftConfigurationImpl conf = getRaftConf();
         final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
   
      
         if (!conf.isStable() || leaderState.inStagingState() || !state.isConfCommitted()) {
           String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
               " when raft reconfiguration in progress.";
           return logAndReturnTransferLeadershipFail(request, msg);
         }
   
         if (!conf.containsInConf(request.getNewLeader())) {
           String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
               " as it is not in " + conf;
           return logAndReturnTransferLeadershipFail(request, msg);
         }
   
         if (!conf.isHighestPriority(request.getNewLeader())) {
           String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
               " as it does not has highest priority " + conf;
           return logAndReturnTransferLeadershipFail(request, msg);
         }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#issuecomment-750601995


   @szetszwo I have updated the patch. Could you help review it again ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#issuecomment-750245608


   > Let's add new TransferLeadership class and put all the code there.
   
   @szetszwo Do I need to put the following check code in TransferLeadership class ? if so, I need to change some method from private to publibc, such as `checkLeaderState`
        ```
   CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, false);
         if (reply != null) {
           return reply;
         }
   
         if (getId().equals(request.getNewLeader())) {
           return CompletableFuture.completedFuture(newSuccessReply(request));
         }
   
         final RaftConfigurationImpl conf = getRaftConf();
         final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
   
         // make sure there is no raft reconfiguration in progress
         if (!conf.isStable() || leaderState.inStagingState() || !state.isConfCommitted()) {
           String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
               " when raft reconfiguration in progress.";
           return logAndReturnTransferLeadershipFail(request, msg);
         }
   
         if (!conf.containsInConf(request.getNewLeader())) {
           String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
               " as it is not in " + conf;
           return logAndReturnTransferLeadershipFail(request, msg);
         }
   
         if (!conf.isHighestPriority(request.getNewLeader())) {
           String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
               " as it does not has highest priority " + conf;
           return logAndReturnTransferLeadershipFail(request, msg);
         }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#issuecomment-750296198


   > Do I need to put the following check code in TransferLeadership class ? ...
   
   Let's keep them in RaftServerImpl.  Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#discussion_r547923629



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -866,18 +880,112 @@ RaftClientReply waitForReply(RaftClientRequest request, CompletableFuture<RaftCl
 
   @Override
   public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    return waitForReply(request, transferLeadershipAsync(request));
+  }
+
+  private CompletableFuture<RaftClientReply> logAndReturnTransferLeadershipFail(
+      TransferLeadershipRequest request, String msg) {
+    LOG.warn(msg);
+    return CompletableFuture.completedFuture(newExceptionReply(request, new StateMachineException(msg)));
+  }
+
+  boolean isSteppingDown() {
+    return finishTransferLeader != null;
+  }
+
+  private Consumer<RaftPeerId> finishTransferLeader;
+
+  public Consumer<RaftPeerId> finishTransferLeader() {
+    return finishTransferLeader;
+  }
+
+  public Consumer<RaftPeerId> setFinishTransferLeader(Consumer<RaftPeerId> consumer) {
+    return finishTransferLeader = consumer;
+  }
+
+  private void timeoutTransferLeadership(
+      TransferLeadershipRequest request, CompletableFuture<RaftClientReply> replyFuture)
+      throws StateMachineException {
+    synchronized (replyFuture) {

Review comment:
       Add `synchroized `is to avoid multi-thread compete at the same time, replyFuture also complete in the following lambda code called by ServerState#setLeader. If without synchroized, the replyFuture maybe complete two times at the same time, even though does not cause error, but maybe we should avoid this.
   ```
         currLeader -> {
           synchronized (replyFuture) {
             if (currLeader == null || replyFuture.isDone()) {
               return;
             }
   
             if (currLeader.equals(request.getNewLeader())) {
               replyFuture.complete(newSuccessReply(request));
               setFinishTransferLeader(null);
             }
           }
         };
   ```

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -866,18 +880,112 @@ RaftClientReply waitForReply(RaftClientRequest request, CompletableFuture<RaftCl
 
   @Override
   public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    return waitForReply(request, transferLeadershipAsync(request));
+  }
+
+  private CompletableFuture<RaftClientReply> logAndReturnTransferLeadershipFail(
+      TransferLeadershipRequest request, String msg) {
+    LOG.warn(msg);
+    return CompletableFuture.completedFuture(newExceptionReply(request, new StateMachineException(msg)));
+  }
+
+  boolean isSteppingDown() {
+    return finishTransferLeader != null;
+  }
+
+  private Consumer<RaftPeerId> finishTransferLeader;
+
+  public Consumer<RaftPeerId> finishTransferLeader() {
+    return finishTransferLeader;
+  }
+
+  public Consumer<RaftPeerId> setFinishTransferLeader(Consumer<RaftPeerId> consumer) {
+    return finishTransferLeader = consumer;
+  }
+
+  private void timeoutTransferLeadership(
+      TransferLeadershipRequest request, CompletableFuture<RaftClientReply> replyFuture)
+      throws StateMachineException {
+    synchronized (replyFuture) {

Review comment:
       @szetszwo Add `synchroized `is to avoid multi-thread compete at the same time, replyFuture also complete in the following lambda code called by ServerState#setLeader. If without synchroized, the replyFuture maybe complete two times at the same time, even though does not cause error, but maybe we should avoid this.
   ```
         currLeader -> {
           synchronized (replyFuture) {
             if (currLeader == null || replyFuture.isDone()) {
               return;
             }
   
             if (currLeader.equals(request.getNewLeader())) {
               replyFuture.complete(newSuccessReply(request));
               setFinishTransferLeader(null);
             }
           }
         };
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#discussion_r548321499



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -866,18 +880,112 @@ RaftClientReply waitForReply(RaftClientRequest request, CompletableFuture<RaftCl
 
   @Override
   public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    return waitForReply(request, transferLeadershipAsync(request));
+  }
+
+  private CompletableFuture<RaftClientReply> logAndReturnTransferLeadershipFail(
+      TransferLeadershipRequest request, String msg) {
+    LOG.warn(msg);
+    return CompletableFuture.completedFuture(newExceptionReply(request, new StateMachineException(msg)));
+  }
+
+  boolean isSteppingDown() {
+    return finishTransferLeader != null;
+  }
+
+  private Consumer<RaftPeerId> finishTransferLeader;
+
+  public Consumer<RaftPeerId> finishTransferLeader() {
+    return finishTransferLeader;
+  }
+
+  public Consumer<RaftPeerId> setFinishTransferLeader(Consumer<RaftPeerId> consumer) {
+    return finishTransferLeader = consumer;
+  }
+
+  private void timeoutTransferLeadership(
+      TransferLeadershipRequest request, CompletableFuture<RaftClientReply> replyFuture)
+      throws StateMachineException {
+    synchronized (replyFuture) {

Review comment:
       With the `AtomicReference<PendingRequest> pending`, we do not need synchronized anymore, thanks a lot.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo merged pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
szetszwo merged pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang closed pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
runzhiwang closed pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#issuecomment-749899296


   > Not sure whether TimeoutScheduler is enough, if user pass a big value of timeout, such as 100 seconds, but transferLeadership finish in 1 second, we should not return to client after 100 seconds, so maybe we have to check periodically to return to client as soon as possible.
   
   When the leader receives a request, it creates a reply future F and a timeout task T.  If the leader has successfully transferred leadership within timeout, it completes F.  When timeout occurs, T checks if F has already been completed.  If yes, T does nothing.  Otherwise, it completes F exceptionally.
   
   In asynchronous event driven, everything is triggered by an event.  It should not periodically check if an event has occurred.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#discussion_r547844071



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
##########
@@ -145,6 +145,17 @@ boolean containsInConf(RaftPeerId peerId) {
     return conf.contains(peerId);
   }
 
+  boolean isHighestPriority(RaftPeerId peerId) {
+    RaftPeer target = getPeer(peerId);
+    Collection<RaftPeer> peers = getCurrentPeers();
+    for (RaftPeer peer : peers) {
+      if (peer.getPriority() >= target.getPriority() && !peer.equals(target)) {

Review comment:
       Should it be `>`?

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -866,18 +880,112 @@ RaftClientReply waitForReply(RaftClientRequest request, CompletableFuture<RaftCl
 
   @Override
   public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    return waitForReply(request, transferLeadershipAsync(request));
+  }
+
+  private CompletableFuture<RaftClientReply> logAndReturnTransferLeadershipFail(
+      TransferLeadershipRequest request, String msg) {
+    LOG.warn(msg);
+    return CompletableFuture.completedFuture(newExceptionReply(request, new StateMachineException(msg)));
+  }
+
+  boolean isSteppingDown() {
+    return finishTransferLeader != null;
+  }
+
+  private Consumer<RaftPeerId> finishTransferLeader;
+
+  public Consumer<RaftPeerId> finishTransferLeader() {
+    return finishTransferLeader;
+  }
+
+  public Consumer<RaftPeerId> setFinishTransferLeader(Consumer<RaftPeerId> consumer) {
+    return finishTransferLeader = consumer;
+  }
+
+  private void timeoutTransferLeadership(
+      TransferLeadershipRequest request, CompletableFuture<RaftClientReply> replyFuture)
+      throws StateMachineException {
+    synchronized (replyFuture) {

Review comment:
       It is better to use `AtomicReference` instead of `synchronized`.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -866,18 +880,112 @@ RaftClientReply waitForReply(RaftClientRequest request, CompletableFuture<RaftCl
 
   @Override
   public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    return waitForReply(request, transferLeadershipAsync(request));
+  }
+
+  private CompletableFuture<RaftClientReply> logAndReturnTransferLeadershipFail(
+      TransferLeadershipRequest request, String msg) {
+    LOG.warn(msg);
+    return CompletableFuture.completedFuture(newExceptionReply(request, new StateMachineException(msg)));

Review comment:
       We should add a TransferLeadershipException.  StateMachineException should come from StateMachine.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -866,18 +880,112 @@ RaftClientReply waitForReply(RaftClientRequest request, CompletableFuture<RaftCl
 
   @Override
   public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    return waitForReply(request, transferLeadershipAsync(request));
+  }
+
+  private CompletableFuture<RaftClientReply> logAndReturnTransferLeadershipFail(
+      TransferLeadershipRequest request, String msg) {
+    LOG.warn(msg);
+    return CompletableFuture.completedFuture(newExceptionReply(request, new StateMachineException(msg)));
+  }
+
+  boolean isSteppingDown() {
+    return finishTransferLeader != null;
+  }
+
+  private Consumer<RaftPeerId> finishTransferLeader;
+
+  public Consumer<RaftPeerId> finishTransferLeader() {
+    return finishTransferLeader;
+  }
+
+  public Consumer<RaftPeerId> setFinishTransferLeader(Consumer<RaftPeerId> consumer) {
+    return finishTransferLeader = consumer;
+  }
+
+  private void timeoutTransferLeadership(
+      TransferLeadershipRequest request, CompletableFuture<RaftClientReply> replyFuture)
+      throws StateMachineException {
+    synchronized (replyFuture) {
+      if (replyFuture.isDone()) {
+        return;
+      }
+
+      setFinishTransferLeader(null);
+
+      if (state.getLeaderId().equals(request.getNewLeader())) {
+        replyFuture.complete(newSuccessReply(request));
+      } else {
+        StateMachineException sme = new StateMachineException("Failed to transfer leadership");
+        replyFuture.complete(newExceptionReply(request, sme));
+        throw sme;
+      }
+    }
   }
 
   @Override
   public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request)
       throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    LOG.info("{}: receive transferLeadership {}", getMemberId(), request);
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+    assertGroup(request.getRequestorId(), request.getRaftGroupId());
+
+    synchronized (this) {
+      CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, false);
+      if (reply != null) {
+        return reply;
+      }
+
+      if (getId().equals(request.getNewLeader())) {
+        return CompletableFuture.completedFuture(newSuccessReply(request));
+      }
+
+      final RaftConfigurationImpl conf = getRaftConf();
+      final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
+
+      // make sure there is no raft reconfiguration in progress
+      if (!conf.isStable() || leaderState.inStagingState() || !state.isConfCommitted()) {
+        String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
+            " when raft reconfiguration in progress.";
+        return logAndReturnTransferLeadershipFail(request, msg);
+      }
+
+      if (!conf.containsInConf(request.getNewLeader())) {
+        String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
+            " as it is not in " + conf;
+        return logAndReturnTransferLeadershipFail(request, msg);
+      }
+
+      if (!conf.isHighestPriority(request.getNewLeader())) {
+        String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
+            " as it does not has highest priority " + conf;
+        return logAndReturnTransferLeadershipFail(request, msg);
+      }
+
+      CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
+
+      setFinishTransferLeader(currLeader -> {
+        synchronized (replyFuture) {
+          if (currLeader == null || replyFuture.isDone()) {
+            return;
+          }
+
+          if (currLeader.equals(request.getNewLeader())) {
+            replyFuture.complete(newSuccessReply(request));
+            setFinishTransferLeader(null);
+          }
+        }
+      });
+
+      scheduler.onTimeout(TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS),
+          () -> timeoutTransferLeadership(request, replyFuture),
+          LOG, () -> "Timeout check failed for append entry request: " + request);
+
+      return replyFuture;

Review comment:
       This part can be moved to TransferLeadership.start(TransferLeadershipRequest).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#issuecomment-750226682


   @szetszwo Thanks the code very much, I will use it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#issuecomment-749875466


   @szetszwo hi, This is only draft, could you have a brief review ? 
   
   1. I did not include change priority in TransferLeaderShipRequest, I think change priority should be left to user, for example, there are 3 servers: server1, server2, server3, we want to transfer leadership from server1 to server2, we can assign server2 with highest priority, but we could not make sure whether server1's priority bigger or smaller than server3's priority, so I think this should be left to user to decide. When transferLeadership, I check whether server2 has the highest priority. 
   
   2. Not sure whether TimeoutScheduler is enough, if user pass a big value of timeout, such as 100 seconds, but transferLeadership finish in 1 second, we should not return to client after 100 seconds, so maybe we have to check periodically to return to client as soon as possible.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang edited a comment on pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
runzhiwang edited a comment on pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#issuecomment-750245608


   > Let's add new TransferLeadership class and put all the code there.
   
   @szetszwo Do I need to put the following check code in TransferLeadership class ? if so, I need to change some method from private to publibc, such as `checkLeaderState`
   
   ```
        CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, false);
         if (reply != null) {
           return reply;
         }
   
         if (getId().equals(request.getNewLeader())) {
           return CompletableFuture.completedFuture(newSuccessReply(request));
         }
   
         final RaftConfigurationImpl conf = getRaftConf();
         final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
   
      
         if (!conf.isStable() || leaderState.inStagingState() || !state.isConfCommitted()) {
           String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
               " when raft reconfiguration in progress.";
           return logAndReturnTransferLeadershipFail(request, msg);
         }
   
         if (!conf.containsInConf(request.getNewLeader())) {
           String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
               " as it is not in " + conf;
           return logAndReturnTransferLeadershipFail(request, msg);
         }
   
         if (!conf.isHighestPriority(request.getNewLeader())) {
           String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
               " as it does not has highest priority " + conf;
           return logAndReturnTransferLeadershipFail(request, msg);
         }
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#issuecomment-749978687


   @szetszwo I have updated the patch, Could you help review it again ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#issuecomment-750036972


   Tried to move the code during review but I have not tested it.
   ```
   class TransferLeadership {
     public static final Logger LOG = LoggerFactory.getLogger(TransferLeadership.class);
   
     class PendingRequest {
       private final TransferLeadershipRequest request;
       private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
   
       PendingRequest(TransferLeadershipRequest request) {
         this.request = request;
       }
   
       TransferLeadershipRequest getRequest() {
         return request;
       }
   
       CompletableFuture<RaftClientReply> getReplyFuture() {
         return replyFuture;
       }
   
       void complete(RaftPeerId currentLeader) {
         if (replyFuture.isDone()) {
           return;
         }
   
         if (currentLeader.equals(request.getNewLeader())) {
           replyFuture.complete(server.newSuccessReply(request));
         } else {
           final TransferLeadershipException e = new TransferLeadershipException(
               "Failed to transfer leadership to " + request.getNewLeader() + ": current leader is " + currentLeader);
           replyFuture.complete(server.newExceptionReply(request, e));
         }
       }
   
       @Override
       public String toString() {
         return request.toString();
       }
     }
   
     private final RaftServerImpl server;
     private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
     private final AtomicReference<PendingRequest> pending = new AtomicReference<>();
   
     TransferLeadership(RaftServerImpl server) {
       this.server = server;
     }
   
     CompletableFuture<RaftClientReply> start(TransferLeadershipRequest request) {
       final MemoizedSupplier<PendingRequest> supplier = JavaUtils.memoize(() -> new PendingRequest(request));
       final PendingRequest previous = pending.getAndUpdate(f -> f != null? f: supplier.get());
       if (previous != null) {
         if (request.getNewLeader().equals(previous.getRequest().getNewLeader())) {
           final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
           previous.getReplyFuture().whenComplete((r, e) -> {
             if (e != null) {
               replyFuture.completeExceptionally(e);
             } else {
               replyFuture.complete(r.isSuccess()? server.newSuccessReply(request)
                   : server.newExceptionReply(request, r.getException()));
             }
           });
           return replyFuture;
         } else {
           final StateMachineException sme = new StateMachineException(
               "Failed to transfer leadership to " + request.getNewLeader() + ": a previous " + previous + " exists");
           return CompletableFuture.completedFuture(server.newExceptionReply(request, sme));
         }
       }
   
       scheduler.onTimeout(TimeDuration.ONE_SECOND,
           () -> finish(server.getState().getLeaderId()),
           LOG, () -> "Timeout check failed for append entry request: " + request);
       return supplier.get().getReplyFuture();
     }
   
     void finish(RaftPeerId currentLeader) {
       Optional.ofNullable(pending.getAndSet(null))
           .ifPresent(r -> r.complete(currentLeader));
     }
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#discussion_r548321499



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -866,18 +880,112 @@ RaftClientReply waitForReply(RaftClientRequest request, CompletableFuture<RaftCl
 
   @Override
   public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    return waitForReply(request, transferLeadershipAsync(request));
+  }
+
+  private CompletableFuture<RaftClientReply> logAndReturnTransferLeadershipFail(
+      TransferLeadershipRequest request, String msg) {
+    LOG.warn(msg);
+    return CompletableFuture.completedFuture(newExceptionReply(request, new StateMachineException(msg)));
+  }
+
+  boolean isSteppingDown() {
+    return finishTransferLeader != null;
+  }
+
+  private Consumer<RaftPeerId> finishTransferLeader;
+
+  public Consumer<RaftPeerId> finishTransferLeader() {
+    return finishTransferLeader;
+  }
+
+  public Consumer<RaftPeerId> setFinishTransferLeader(Consumer<RaftPeerId> consumer) {
+    return finishTransferLeader = consumer;
+  }
+
+  private void timeoutTransferLeadership(
+      TransferLeadershipRequest request, CompletableFuture<RaftClientReply> replyFuture)
+      throws StateMachineException {
+    synchronized (replyFuture) {

Review comment:
       With the new `AtomicReference<PendingRequest> pending`, we do not need synchronized anymore, thanks a lot.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#discussion_r547916107



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
##########
@@ -145,6 +145,17 @@ boolean containsInConf(RaftPeerId peerId) {
     return conf.contains(peerId);
   }
 
+  boolean isHighestPriority(RaftPeerId peerId) {
+    RaftPeer target = getPeer(peerId);
+    Collection<RaftPeer> peers = getCurrentPeers();
+    for (RaftPeer peer : peers) {
+      if (peer.getPriority() >= target.getPriority() && !peer.equals(target)) {

Review comment:
       @szetszwo For example, if server1, server2, server3 with the same priority, we can not transfer leader from server1 to server2,  because server1 only `yieldLeaderToHigherPriorityPeer ` when server1 find it's priorty is smaller than followers' priority, so server2 must has the highest priority.
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#discussion_r547916752



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
##########
@@ -145,6 +145,17 @@ boolean containsInConf(RaftPeerId peerId) {
     return conf.contains(peerId);
   }
 
+  boolean isHighestPriority(RaftPeerId peerId) {
+    RaftPeer target = getPeer(peerId);
+    Collection<RaftPeer> peers = getCurrentPeers();
+    for (RaftPeer peer : peers) {
+      if (peer.getPriority() >= target.getPriority() && !peer.equals(target)) {

Review comment:
       I see.  Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang removed a comment on pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
runzhiwang removed a comment on pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#issuecomment-750245608


   > Let's add new TransferLeadership class and put all the code there.
   
   @szetszwo Do I need to put the following check code in TransferLeadership class ? if so, I need to change some method from private to publibc, such as `checkLeaderState`
   
   ```
        CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, false);
         if (reply != null) {
           return reply;
         }
   
         if (getId().equals(request.getNewLeader())) {
           return CompletableFuture.completedFuture(newSuccessReply(request));
         }
   
         final RaftConfigurationImpl conf = getRaftConf();
         final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
   
      
         if (!conf.isStable() || leaderState.inStagingState() || !state.isConfCommitted()) {
           String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
               " when raft reconfiguration in progress.";
           return logAndReturnTransferLeadershipFail(request, msg);
         }
   
         if (!conf.containsInConf(request.getNewLeader())) {
           String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
               " as it is not in " + conf;
           return logAndReturnTransferLeadershipFail(request, msg);
         }
   
         if (!conf.isHighestPriority(request.getNewLeader())) {
           String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
               " as it does not has highest priority " + conf;
           return logAndReturnTransferLeadershipFail(request, msg);
         }
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #372: RATIS-1260. Implement transferLeaderShip in server

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #372:
URL: https://github.com/apache/incubator-ratis/pull/372#issuecomment-749899631


   > I did not include change priority in TransferLeaderShipRequest, ...
   
   Sounds great.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org