You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/27 04:39:32 UTC

[incubator-ratis] branch master updated: RATIS-1266. Leader send StartLeaderElectionRequest to higher priority peer (#376)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 79fd5af  RATIS-1266. Leader send StartLeaderElectionRequest to higher priority peer (#376)
79fd5af is described below

commit 79fd5afde2b4cd92ef86ebea7a047204d981a2a2
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Sun Dec 27 12:39:21 2020 +0800

    RATIS-1266. Leader send StartLeaderElectionRequest to higher priority peer (#376)
    
    * RATIS-1266. Leader send TimeoutNow request to higher priority peer
---
 .../grpc/server/GrpcServerProtocolClient.java      |  7 +++
 .../grpc/server/GrpcServerProtocolService.java     | 14 ++++++
 .../org/apache/ratis/grpc/server/GrpcService.java  |  8 ++++
 .../ratis/hadooprpc/server/HadoopRpcService.java   |  9 ++++
 .../RaftServerProtocolServerSideTranslatorPB.java  |  9 ++++
 .../src/main/proto/HadoopCompatability.proto       |  1 +
 .../java/org/apache/ratis/netty/NettyRpcProxy.java |  2 +
 .../apache/ratis/netty/server/NettyRpcService.java | 19 ++++++++
 ratis-proto/src/main/proto/Grpc.proto              |  3 ++
 ratis-proto/src/main/proto/Netty.proto             |  2 +
 ratis-proto/src/main/proto/Raft.proto              |  9 ++++
 .../ratis/server/protocol/RaftServerProtocol.java  |  4 ++
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 39 +++++++++++------
 .../apache/ratis/server/impl/RaftServerImpl.java   | 50 ++++++++++++++++++++++
 .../apache/ratis/server/impl/RaftServerProxy.java  |  7 +++
 .../apache/ratis/server/impl/ServerProtoUtils.java | 17 ++++++++
 .../server/impl/BlockRequestHandlingInjection.java |  1 +
 .../ratis/server/simulation/RaftServerReply.java   | 33 ++++++++++++--
 .../ratis/server/simulation/RaftServerRequest.java | 32 ++++++++++++--
 .../server/simulation/SimulatedServerRpc.java      | 12 ++++++
 .../ratis/datastream/DataStreamBaseTest.java       |  7 +++
 21 files changed, 265 insertions(+), 20 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index ea1db78..fcf8126 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -100,6 +100,13 @@ public class GrpcServerProtocolClient implements Closeable {
     return r;
   }
 
+  public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) {
+    StartLeaderElectionReplyProto r =
+        blockingStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+            .startLeaderElection(request);
+    return r;
+  }
+
   StreamObserver<AppendEntriesRequestProto> appendEntries(
       StreamObserver<AppendEntriesReplyProto> responseHandler) {
     return asyncStub.appendEntries(responseHandler);
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 66d69b6..90386fd 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -177,6 +177,20 @@ class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase {
   }
 
   @Override
+  public void startLeaderElection(StartLeaderElectionRequestProto request,
+      StreamObserver<StartLeaderElectionReplyProto> responseObserver) {
+    try {
+      final StartLeaderElectionReplyProto reply = server.startLeaderElection(request);
+      responseObserver.onNext(reply);
+      responseObserver.onCompleted();
+    } catch (Throwable e) {
+      GrpcUtil.warn(LOG,
+          () -> getId() + ": Failed startLeaderElection " + ProtoUtils.toString(request.getServerRequest()), e);
+      responseObserver.onError(GrpcUtil.wrapException(e));
+    }
+  }
+
+  @Override
   public StreamObserver<AppendEntriesRequestProto> appendEntries(
       StreamObserver<AppendEntriesReplyProto> responseObserver) {
     return new ServerRequestStreamObserver<AppendEntriesRequestProto, AppendEntriesReplyProto>(
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index 57500e7..f570672 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -223,4 +223,12 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
     final RaftPeerId target = RaftPeerId.valueOf(request.getServerRequest().getReplyId());
     return getProxies().getProxy(target).requestVote(request);
   }
+
+  @Override
+  public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
+    CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, getId(), null, request);
+
+    final RaftPeerId target = RaftPeerId.valueOf(request.getServerRequest().getReplyId());
+    return getProxies().getProxy(target).startLeaderElection(request);
+  }
 }
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
index a9ca1a9..52c2c31 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -30,6 +30,8 @@ import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
 import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
 import org.apache.ratis.proto.hadoop.HadoopCompatibilityProtos.HadoopServerProtocolService;
 import org.apache.ratis.proto.hadoop.HadoopCompatibilityProtos.HadoopClientProtocolService;
 import org.apache.ratis.proto.hadoop.HadoopCompatibilityProtos.ServerOps;
@@ -183,6 +185,13 @@ public final class HadoopRpcService extends RaftServerRpcWithProxy<Proxy<RaftSer
         ServerOps.requestVote, RequestVoteReplyProto::parseFrom);
   }
 
+  @Override
+  public StartLeaderElectionReplyProto startLeaderElection(
+      StartLeaderElectionRequestProto request) throws IOException {
+    return processRequest(request, request.getServerRequest().getReplyId(),
+        ServerOps.startLeaderElection, StartLeaderElectionReplyProto::parseFrom);
+  }
+
   private <REQUEST extends GeneratedMessageV3, REPLY> REPLY processRequest(
       REQUEST request, org.apache.ratis.thirdparty.com.google.protobuf.ByteString replyId,
       ServerOps type, CheckedFunction<byte[], REPLY, InvalidProtocolBufferException> func)
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolServerSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolServerSideTranslatorPB.java
index c5d1948..9f61207 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolServerSideTranslatorPB.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolServerSideTranslatorPB.java
@@ -33,6 +33,8 @@ import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
 import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
 import org.apache.ratis.thirdparty.com.google.protobuf.GeneratedMessageV3;
 
 @InterfaceAudience.Private
@@ -54,6 +56,9 @@ public class RaftServerProtocolServerSideTranslatorPB
         case requestVote:
           respone = requestVote(RequestVoteRequestProto.parseFrom(buffer));
           break;
+        case startLeaderElection:
+          respone = startLeaderElection(StartLeaderElectionRequestProto.parseFrom(buffer));
+          break;
         case installSnapshot:
           respone = installSnapshot(InstallSnapshotRequestProto.parseFrom(buffer));
           break;
@@ -77,6 +82,10 @@ public class RaftServerProtocolServerSideTranslatorPB
     return impl.requestVote(request);
   }
 
+  public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
+    return impl.startLeaderElection(request);
+  }
+
   public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request)
       throws IOException {
     return impl.appendEntries(request);
diff --git a/ratis-hadoop/src/main/proto/HadoopCompatability.proto b/ratis-hadoop/src/main/proto/HadoopCompatability.proto
index 9fec7c2..1b67755 100644
--- a/ratis-hadoop/src/main/proto/HadoopCompatability.proto
+++ b/ratis-hadoop/src/main/proto/HadoopCompatability.proto
@@ -34,6 +34,7 @@ enum ServerOps {
   requestVote = 1;
   appendEntries = 2;
   installSnapshot = 3;
+  startLeaderElection = 4;
 }
 
 message ServerRequestProto {
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
index 65a8052..b2b8763 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -79,6 +79,8 @@ public class NettyRpcProxy implements Closeable {
     switch (proto.getRaftNettyServerReplyCase()) {
       case REQUESTVOTEREPLY:
         return proto.getRequestVoteReply().getServerReply().getCallId();
+      case STARTLEADERELECTIONREPLY:
+        return proto.getStartLeaderElectionReply().getServerReply().getCallId();
       case APPENDENTRIESREPLY:
         return proto.getAppendEntriesReply().getServerReply().getCallId();
       case INSTALLSNAPSHOTREPLY:
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 7d96579..50279d6 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -184,6 +184,13 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy,
               .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(transferLeadershipReply))
               .build();
 
+        case STARTLEADERELECTIONREQUEST:
+          final StartLeaderElectionRequestProto startLeaderElectionRequest = proto.getStartLeaderElectionRequest();
+          rpcRequest = startLeaderElectionRequest.getServerRequest();
+          final StartLeaderElectionReplyProto startLeaderElectionReply =
+              server.startLeaderElection(startLeaderElectionRequest);
+          return RaftNettyServerReplyProto.newBuilder().setStartLeaderElectionReply(startLeaderElectionReply).build();
+
         case APPENDENTRIESREQUEST:
           final AppendEntriesRequestProto appendEntriesRequest = proto.getAppendEntriesRequest();
           rpcRequest = appendEntriesRequest.getServerRequest();
@@ -282,6 +289,18 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy,
     return sendRaftNettyServerRequestProto(serverRequest, proto).getRequestVoteReply();
   }
 
+
+  @Override
+  public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
+    CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, getId(), null, request);
+
+    final RaftNettyServerRequestProto proto = RaftNettyServerRequestProto.newBuilder()
+        .setStartLeaderElectionRequest(request)
+        .build();
+    final RaftRpcRequestProto serverRequest = request.getServerRequest();
+    return sendRaftNettyServerRequestProto(serverRequest, proto).getStartLeaderElectionReply();
+  }
+
   @Override
   public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException {
     CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, getId(), null, request);
diff --git a/ratis-proto/src/main/proto/Grpc.proto b/ratis-proto/src/main/proto/Grpc.proto
index bd97961..14b6067 100644
--- a/ratis-proto/src/main/proto/Grpc.proto
+++ b/ratis-proto/src/main/proto/Grpc.proto
@@ -44,6 +44,9 @@ service RaftServerProtocolService {
   rpc requestVote(ratis.common.RequestVoteRequestProto)
       returns(ratis.common.RequestVoteReplyProto) {}
 
+  rpc startLeaderElection(ratis.common.StartLeaderElectionRequestProto)
+      returns(ratis.common.StartLeaderElectionReplyProto) {}
+
   rpc appendEntries(stream ratis.common.AppendEntriesRequestProto)
       returns(stream ratis.common.AppendEntriesReplyProto) {}
 
diff --git a/ratis-proto/src/main/proto/Netty.proto b/ratis-proto/src/main/proto/Netty.proto
index 17155a4..cea3ada 100644
--- a/ratis-proto/src/main/proto/Netty.proto
+++ b/ratis-proto/src/main/proto/Netty.proto
@@ -39,6 +39,7 @@ message RaftNettyServerRequestProto {
     ratis.common.GroupListRequestProto groupListRequest = 7;
     ratis.common.GroupInfoRequestProto groupInfoRequest = 8;
     ratis.common.TransferLeadershipRequestProto transferLeadershipRequest = 9;
+    ratis.common.StartLeaderElectionRequestProto startLeaderElectionRequest = 10;
   }
 }
 
@@ -51,5 +52,6 @@ message RaftNettyServerReplyProto {
     ratis.common.GroupListReplyProto groupListReply = 5;
     ratis.common.GroupInfoReplyProto groupInfoReply = 6;
     RaftNettyExceptionReplyProto exceptionReply = 7;
+    ratis.common.StartLeaderElectionReplyProto startLeaderElectionReply = 8;
   }
 }
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 1d5e57e..822783a 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -392,6 +392,15 @@ message TransferLeadershipRequestProto {
   RaftPeerProto newLeader = 2;
 }
 
+message StartLeaderElectionRequestProto {
+  RaftRpcRequestProto serverRequest = 1;
+  TermIndexProto leaderLastEntry = 2;
+}
+
+message StartLeaderElectionReplyProto {
+  RaftRpcReplyProto serverReply = 1;
+}
+
 // A request to add a new group
 message GroupAddRequestProto {
   RaftGroupProto group = 1; // the group to be added.
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerProtocol.java b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerProtocol.java
index c8cfad6..6e707ee 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerProtocol.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerProtocol.java
@@ -25,6 +25,8 @@ import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
 import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
 
 public interface RaftServerProtocol {
   enum Op {REQUEST_VOTE, APPEND_ENTRIES, INSTALL_SNAPSHOT}
@@ -34,4 +36,6 @@ public interface RaftServerProtocol {
   AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException;
 
   InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException;
+
+  StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException;
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index d54e337..4bf79c2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -23,6 +23,8 @@ import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
@@ -532,16 +534,27 @@ class LeaderStateImpl implements LeaderState {
     }
   }
 
-  private synchronized void yieldLeaderToHigherPriorityPeer(long term, TermIndex lastEntry) {
+  private synchronized void sendStartLeaderElectionToHigherPriorityPeer(RaftPeerId follower, TermIndex lastEntry) {
     ServerState state = server.getState();
     TermIndex currLastEntry = state.getLastEntry();
     if (ServerState.compareLog(currLastEntry, lastEntry) != 0) {
-      LOG.warn("{} can not stepDown because currLastEntry:{} did not match lastEntry:{}",
-          this, currLastEntry, lastEntry);
+      LOG.warn("{} can not send StartLeaderElectionRequest to follower:{} because currLastEntry:{} " +
+              "did not match lastEntry:{}", this, follower, currLastEntry, lastEntry);
       return;
     }
 
-    stepDown(term, StepDownReason.HIGHER_PRIORITY);
+    final StartLeaderElectionRequestProto r = ServerProtoUtils.toStartLeaderElectionRequestProto(
+        server.getMemberId(), follower, lastEntry);
+    CompletableFuture.supplyAsync(() -> {
+      try {
+        StartLeaderElectionReplyProto replyProto = server.getServerRpc().startLeaderElection(r);
+        LOG.info("{} received {} reply of StartLeaderElectionRequest from follower:{}",
+            this, replyProto.getServerReply().getSuccess() ? "success" : "fail", follower);
+      } catch (IOException e) {
+        LOG.warn("{} send StartLeaderElectionRequest throw exception", this, e);
+      }
+      return null;
+    });
   }
 
   private void prepare() {
@@ -872,23 +885,21 @@ class LeaderStateImpl implements LeaderState {
 
       final TermIndex leaderLastEntry = server.getState().getLastEntry();
       if (leaderLastEntry == null) {
-        LOG.info("{} stepDown leadership on term:{} because follower {}'s priority:{} is higher than leader's:{} " +
-                "and leader's lastEntry is null",
-            this, currentTerm, followerID, followerPriority, leaderPriority);
+        LOG.info("{} send StartLeaderElectionRequest to follower:{} on term:{} because follower's priority:{} " +
+                "is higher than leader's:{} and leader's lastEntry is null",
+            this, followerID, currentTerm, followerPriority, leaderPriority);
 
-        // step down as follower
-        yieldLeaderToHigherPriorityPeer(currentTerm, leaderLastEntry);
+        sendStartLeaderElectionToHigherPriorityPeer(followerID, leaderLastEntry);
         return;
       }
 
       if (followerInfo.getMatchIndex() >= leaderLastEntry.getIndex()) {
-        LOG.info("{} stepDown leadership on term:{} because follower {}'s priority:{} is higher than leader's:{} " +
-                "and follower's lastEntry index:{} catch up with leader's:{}",
-            this, currentTerm, followerID, followerPriority, leaderPriority, followerInfo.getMatchIndex(),
+        LOG.info("{} send StartLeaderElectionRequest to follower:{} on term:{} because follower's priority:{} " +
+                "is higher than leader's:{} and follower's lastEntry index:{} catch up with leader's:{}",
+            this, followerID, currentTerm, followerPriority, leaderPriority, followerInfo.getMatchIndex(),
             leaderLastEntry.getIndex());
 
-        // step down as follower
-        yieldLeaderToHigherPriorityPeer(currentTerm, leaderLastEntry);
+        sendStartLeaderElectionToHigherPriorityPeer(followerID, leaderLastEntry);
         return;
       }
     }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f24c3bc..5f2c417 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -96,6 +96,7 @@ class RaftServerImpl implements RaftServer.Division,
   static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
   static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot";
   static final String LOG_SYNC = APPEND_ENTRIES + ".logComplete";
+  static final String START_LEADER_ELECTION = CLASS_NAME + ".startLeaderElection";
 
   class Info implements DivisionInfo {
     @Override
@@ -1387,6 +1388,55 @@ class RaftServerImpl implements RaftServer.Division,
     return true;
   }
 
+  @Override
+  public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
+    final RaftRpcRequestProto r = request.getServerRequest();
+    final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
+    final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(r.getRaftGroupId());
+    final TermIndex leaderLastEntry = TermIndex.valueOf(request.getLeaderLastEntry());
+
+    CodeInjectionForTesting.execute(START_LEADER_ELECTION, getId(), leaderId, request);
+
+    LOG.debug("{}: receive startLeaderElection from:{}, leaderLastEntry:{},",
+        getMemberId(), leaderId, request.getLeaderLastEntry());
+
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+    assertGroup(leaderId, leaderGroupId);
+
+    synchronized (this) {
+      // leaderLastEntry should not be null because LeaderStateImpl#start append a placeHolder entry
+      // so leader at each term should has at least one entry
+      if (leaderLastEntry == null) {
+        LOG.warn("{}: receive null leaderLastEntry which is unexpected", getMemberId());
+        return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+      }
+
+      // Check life cycle state again to avoid the PAUSING/PAUSED state.
+      assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
+      final boolean recognized = state.recognizeLeader(leaderId, leaderLastEntry.getTerm());
+      if (!recognized) {
+        LOG.warn("{}: Not recognize {} (term={}) as leader, state: {}",
+            getMemberId(), leaderId, leaderLastEntry.getTerm(), state);
+        return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+      }
+
+      if (!getInfo().isFollower()) {
+        LOG.warn("{} refused StartLeaderElectionRequest from {}, because role is:{}",
+            getMemberId(), leaderId, role.getCurrentRole());
+        return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+      }
+
+      if (ServerState.compareLog(state.getLastEntry(), leaderLastEntry) < 0) {
+        LOG.warn("{} refused StartLeaderElectionRequest from {}, because lastEntry:{} less than leaderEntry:{}",
+            getMemberId(), leaderId, leaderLastEntry, state.getLastEntry());
+        return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+      }
+
+      changeToCandidate();
+      return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), true);
+    }
+  }
+
   private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProto request) throws IOException {
     final RaftRpcRequestProto r = request.getServerRequest();
     final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 6e4c302..e6408f8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -28,6 +28,8 @@ import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
 import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
 import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
@@ -545,6 +547,11 @@ class RaftServerProxy implements RaftServer {
   }
 
   @Override
+  public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
+    return getImpl(request.getServerRequest()).startLeaderElection(request);
+  }
+
+  @Override
   public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto request) {
     final RaftGroupId groupId = ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
     return submitRequest(groupId, impl -> impl.appendEntriesAsync(request));
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index f264652..ca672f3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -58,6 +58,23 @@ final class ServerProtoUtils {
     return b.build();
   }
 
+  static StartLeaderElectionReplyProto toStartLeaderElectionReplyProto(
+      RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success) {
+    return StartLeaderElectionReplyProto.newBuilder()
+        .setServerReply(toRaftRpcReplyProtoBuilder(requestorId, replyId, success))
+        .build();
+  }
+
+  static StartLeaderElectionRequestProto toStartLeaderElectionRequestProto(
+      RaftGroupMemberId requestorId, RaftPeerId replyId, TermIndex lastEntry) {
+    final StartLeaderElectionRequestProto.Builder b = StartLeaderElectionRequestProto.newBuilder()
+        .setServerRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId));
+    if (lastEntry != null) {
+      b.setLeaderLastEntry(lastEntry.toProto());
+    }
+    return b.build();
+  }
+
   static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
       RaftPeerId requestorId, RaftGroupMemberId replyId,
       long currentTerm, int requestIndex, InstallSnapshotResult result) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java
index 8566047..3487aa4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java
@@ -33,6 +33,7 @@ public class BlockRequestHandlingInjection implements CodeInjectionForTesting.Co
     CodeInjectionForTesting.put(RaftServerImpl.REQUEST_VOTE, INSTANCE);
     CodeInjectionForTesting.put(RaftServerImpl.APPEND_ENTRIES, INSTANCE);
     CodeInjectionForTesting.put(RaftServerImpl.INSTALL_SNAPSHOT, INSTANCE);
+    CodeInjectionForTesting.put(RaftServerImpl.START_LEADER_ELECTION, INSTANCE);
   }
 
   public static BlockRequestHandlingInjection getInstance() {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java
index d5545cb..5d9d871 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java
@@ -17,11 +17,13 @@
  */
 package org.apache.ratis.server.simulation;
 
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftRpcMessage;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
 import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
 import org.apache.ratis.util.ProtoUtils;
 
 import java.util.Objects;
@@ -30,23 +32,34 @@ public class RaftServerReply implements RaftRpcMessage {
   private final AppendEntriesReplyProto appendEntries;
   private final RequestVoteReplyProto requestVote;
   private final InstallSnapshotReplyProto installSnapshot;
+  private final StartLeaderElectionReplyProto startLeaderElection;
 
   RaftServerReply(AppendEntriesReplyProto a) {
     appendEntries = Objects.requireNonNull(a);
     requestVote = null;
     installSnapshot = null;
+    startLeaderElection = null;
   }
 
   RaftServerReply(RequestVoteReplyProto r) {
     appendEntries = null;
     requestVote = Objects.requireNonNull(r);
     installSnapshot = null;
+    startLeaderElection = null;
   }
 
   RaftServerReply(InstallSnapshotReplyProto i) {
     appendEntries = null;
     requestVote = null;
     installSnapshot = Objects.requireNonNull(i);
+    startLeaderElection = null;
+  }
+
+  RaftServerReply(StartLeaderElectionReplyProto i) {
+    appendEntries = null;
+    requestVote = null;
+    installSnapshot = null;
+    startLeaderElection = Objects.requireNonNull(i);
   }
 
   boolean isAppendEntries() {
@@ -61,6 +74,10 @@ public class RaftServerReply implements RaftRpcMessage {
     return installSnapshot != null;
   }
 
+  boolean isStartLeaderElection() {
+    return startLeaderElection != null;
+  }
+
   AppendEntriesReplyProto getAppendEntries() {
     return appendEntries;
   }
@@ -73,6 +90,10 @@ public class RaftServerReply implements RaftRpcMessage {
     return installSnapshot;
   }
 
+  StartLeaderElectionReplyProto getStartLeaderElection() {
+    return startLeaderElection;
+  }
+
   @Override
   public boolean isRequest() {
     return false;
@@ -84,8 +105,10 @@ public class RaftServerReply implements RaftRpcMessage {
       return appendEntries.getServerReply().getRequestorId().toStringUtf8();
     } else if (isRequestVote()) {
       return requestVote.getServerReply().getRequestorId().toStringUtf8();
-    } else {
+    } else if (isInstallSnapshot()) {
       return installSnapshot.getServerReply().getRequestorId().toStringUtf8();
+    } else {
+      return startLeaderElection.getServerReply().getRequestorId().toStringUtf8();
     }
   }
 
@@ -95,8 +118,10 @@ public class RaftServerReply implements RaftRpcMessage {
       return appendEntries.getServerReply().getReplyId().toStringUtf8();
     } else if (isRequestVote()) {
       return requestVote.getServerReply().getReplyId().toStringUtf8();
-    } else {
+    } else if (isInstallSnapshot()) {
       return installSnapshot.getServerReply().getReplyId().toStringUtf8();
+    } else {
+      return startLeaderElection.getServerReply().getReplyId().toStringUtf8();
     }
   }
 
@@ -106,8 +131,10 @@ public class RaftServerReply implements RaftRpcMessage {
       return ProtoUtils.toRaftGroupId(appendEntries.getServerReply().getRaftGroupId());
     } else if (isRequestVote()) {
       return ProtoUtils.toRaftGroupId(requestVote.getServerReply().getRaftGroupId());
-    } else {
+    } else if (isInstallSnapshot()) {
       return ProtoUtils.toRaftGroupId(installSnapshot.getServerReply().getRaftGroupId());
+    } else {
+      return ProtoUtils.toRaftGroupId(startLeaderElection.getServerReply().getRaftGroupId());
     }
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java
index 1ec791e..787052d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java
@@ -22,29 +22,41 @@ import org.apache.ratis.protocol.RaftRpcMessage;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
 import org.apache.ratis.util.ProtoUtils;
 
 class RaftServerRequest implements RaftRpcMessage {
   private final AppendEntriesRequestProto appendEntries;
   private final RequestVoteRequestProto requestVote;
   private final InstallSnapshotRequestProto installSnapshot;
+  private final StartLeaderElectionRequestProto startLeaderElection;
 
   RaftServerRequest(AppendEntriesRequestProto a) {
     appendEntries = a;
     requestVote = null;
     installSnapshot = null;
+    startLeaderElection = null;
   }
 
   RaftServerRequest(RequestVoteRequestProto r) {
     appendEntries = null;
     requestVote = r;
     installSnapshot = null;
+    startLeaderElection = null;
   }
 
   RaftServerRequest(InstallSnapshotRequestProto i) {
     appendEntries = null;
     requestVote = null;
     installSnapshot = i;
+    startLeaderElection = null;
+  }
+
+  RaftServerRequest(StartLeaderElectionRequestProto i) {
+    appendEntries = null;
+    requestVote = null;
+    installSnapshot = null;
+    startLeaderElection = i;
   }
 
   boolean isAppendEntries() {
@@ -59,6 +71,10 @@ class RaftServerRequest implements RaftRpcMessage {
     return installSnapshot != null;
   }
 
+  boolean isStartLeaderElection() {
+    return startLeaderElection != null;
+  }
+
   AppendEntriesRequestProto getAppendEntries() {
     return appendEntries;
   }
@@ -71,6 +87,10 @@ class RaftServerRequest implements RaftRpcMessage {
     return installSnapshot;
   }
 
+  StartLeaderElectionRequestProto getStartLeaderElection() {
+    return startLeaderElection;
+  }
+
   @Override
   public boolean isRequest() {
     return true;
@@ -82,8 +102,10 @@ class RaftServerRequest implements RaftRpcMessage {
       return appendEntries.getServerRequest().getRequestorId().toStringUtf8();
     } else if (isRequestVote()) {
       return requestVote.getServerRequest().getRequestorId().toStringUtf8();
-    } else {
+    } else if (isInstallSnapshot()) {
       return installSnapshot.getServerRequest().getRequestorId().toStringUtf8();
+    } else {
+      return startLeaderElection.getServerRequest().getRequestorId().toStringUtf8();
     }
   }
 
@@ -93,8 +115,10 @@ class RaftServerRequest implements RaftRpcMessage {
       return appendEntries.getServerRequest().getReplyId().toStringUtf8();
     } else if (isRequestVote()) {
       return requestVote.getServerRequest().getReplyId().toStringUtf8();
-    } else {
+    } else if (isInstallSnapshot()) {
       return installSnapshot.getServerRequest().getReplyId().toStringUtf8();
+    } else {
+      return startLeaderElection.getServerRequest().getReplyId().toStringUtf8();
     }
   }
 
@@ -104,8 +128,10 @@ class RaftServerRequest implements RaftRpcMessage {
       return ProtoUtils.toRaftGroupId(appendEntries.getServerRequest().getRaftGroupId());
     } else if (isRequestVote()) {
       return ProtoUtils.toRaftGroupId(requestVote.getServerRequest().getRaftGroupId());
-    } else {
+    } else if (isInstallSnapshot()) {
       return ProtoUtils.toRaftGroupId(installSnapshot.getServerRequest().getRaftGroupId());
+    } else {
+      return ProtoUtils.toRaftGroupId(startLeaderElection.getServerRequest().getRaftGroupId());
     }
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index e11c339..1e648dc 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -17,12 +17,15 @@
  */
 package org.apache.ratis.server.simulation;
 
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
 import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
 import org.apache.ratis.protocol.GroupInfoRequest;
 import org.apache.ratis.protocol.GroupListRequest;
 import org.apache.ratis.protocol.GroupManagementRequest;
@@ -126,6 +129,13 @@ class SimulatedServerRpc implements RaftServerRpc {
   }
 
   @Override
+  public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request)
+      throws IOException {
+    RaftServerReply reply = serverHandler.getRpc().sendRequest(new RaftServerRequest(request));
+    return reply.getStartLeaderElection();
+  }
+
+  @Override
   public void addRaftPeers(Collection<RaftPeer> peers) {
     // do nothing
   }
@@ -151,6 +161,8 @@ class SimulatedServerRpc implements RaftServerRpc {
         return new RaftServerReply(server.requestVote(r.getRequestVote()));
       } else if (r.isInstallSnapshot()) {
         return new RaftServerReply(server.installSnapshot(r.getInstallSnapshot()));
+      } else if (r.isStartLeaderElection()) {
+        return new RaftServerReply(server.startLeaderElection(r.getStartLeaderElection()));
       } else {
         throw new IllegalStateException("unexpected state");
       }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index e1b1b1f..b554bf7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -39,6 +39,8 @@ import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
 import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.DataStreamReply;
@@ -253,6 +255,11 @@ abstract class DataStreamBaseTest extends BaseTest {
       }
 
       @Override
+      public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
+        return null;
+      }
+
+    @Override
       public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto request) {
         return null;
       }