You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/22 06:38:45 UTC

[incubator-ratis] branch master updated: RATIS-1254. Add transfer leadership request and proto (#366)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 01b8a85  RATIS-1254. Add transfer leadership request and proto (#366)
01b8a85 is described below

commit 01b8a85d0aedf0ca555bdd83d203893140d8d212
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Tue Dec 22 14:38:39 2020 +0800

    RATIS-1254. Add transfer leadership request and proto (#366)
    
    * RATIS-1254. Add transfer leadership request and proto
    
    * fix code review
---
 .../java/org/apache/ratis/client/RaftClient.java     |  3 +++
 .../apache/ratis/client/impl/ClientProtoUtils.java   | 20 ++++++++++++++++++++
 .../org/apache/ratis/client/impl/RaftClientImpl.java |  8 ++++++++
 .../protocol/RaftClientAsynchronousProtocol.java     |  3 +++
 .../apache/ratis/protocol/RaftClientProtocol.java    |  2 ++
 ...tProtocol.java => TransferLeadershipRequest.java} | 16 +++++++++++-----
 .../ratis/grpc/client/GrpcClientProtocolClient.java  |  8 ++++++++
 .../ratis/grpc/client/GrpcClientProtocolService.java |  9 +++++++++
 .../org/apache/ratis/grpc/client/GrpcClientRpc.java  |  5 +++++
 ...CombinedClientProtocolClientSideTranslatorPB.java | 11 +++++++++++
 ...CombinedClientProtocolServerSideTranslatorPB.java | 11 +++++++++++
 .../ratis/hadooprpc/client/HadoopClientRpc.java      |  2 ++
 .../src/main/proto/HadoopCompatability.proto         |  1 +
 .../apache/ratis/netty/client/NettyClientRpc.java    |  5 +++++
 .../apache/ratis/netty/server/NettyRpcService.java   |  9 +++++++++
 ratis-proto/src/main/proto/Grpc.proto                |  3 +++
 ratis-proto/src/main/proto/Hadoop.proto              |  3 +++
 ratis-proto/src/main/proto/Netty.proto               |  1 +
 ratis-proto/src/main/proto/Raft.proto                |  6 ++++++
 .../org/apache/ratis/server/impl/RaftServerImpl.java | 13 +++++++++++++
 .../apache/ratis/server/impl/RaftServerProxy.java    | 11 +++++++++++
 .../ratis/server/simulation/SimulatedServerRpc.java  |  3 +++
 .../apache/ratis/datastream/DataStreamBaseTest.java  | 12 ++++++++++++
 23 files changed, 160 insertions(+), 5 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index b6486ae..0adf35b 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -68,6 +68,9 @@ public interface RaftClient extends Closeable {
   /** Send set configuration request to the raft service. */
   RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException;
 
+  /** Transfer leadership to the given server.*/
+  RaftClientReply transferLeadership(RaftGroupId group, RaftPeerId newLeader) throws IOException;
+
   /** @return a {@link Builder}. */
   static Builder newBuilder() {
     return new Builder();
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 2342ba3..50af252 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -411,6 +411,26 @@ public interface ClientProtoUtils {
         .build();
   }
 
+  static TransferLeadershipRequest toTransferLeadershipRequest(
+      TransferLeadershipRequestProto p) {
+    final RaftRpcRequestProto m = p.getRpcRequest();
+    final RaftPeer newLeader = ProtoUtils.toRaftPeer(p.getNewLeader());
+    return new TransferLeadershipRequest(
+        ClientId.valueOf(m.getRequestorId()),
+        RaftPeerId.valueOf(m.getReplyId()),
+        ProtoUtils.toRaftGroupId(m.getRaftGroupId()),
+        p.getRpcRequest().getCallId(),
+        newLeader.getId());
+  }
+
+  static TransferLeadershipRequestProto toTransferLeadershipRequestProto(
+      TransferLeadershipRequest request) {
+    return TransferLeadershipRequestProto.newBuilder()
+        .setRpcRequest(toRaftRpcRequestProtoBuilder(request))
+        .setNewLeader(RaftPeer.newBuilder().setId(request.getNewLeader()).build().getRaftPeerProto())
+        .build();
+  }
+
   static GroupManagementRequest toGroupManagementRequest(GroupManagementRequestProto p) {
     final RaftRpcRequestProto m = p.getRpcRequest();
     final ClientId clientId = ClientId.valueOf(m.getRequestorId());
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 0c70c88..d5437c7 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -35,6 +35,7 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
 import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.RaftException;
@@ -212,6 +213,13 @@ public final class RaftClientImpl implements RaftClient {
         callId, message, type, slidingWindowEntry);
   }
 
+  @Override
+  public RaftClientReply transferLeadership(RaftGroupId raftGroupId, RaftPeerId newLeader) throws IOException {
+    Objects.requireNonNull(newLeader, "newLeader == null");
+    final long callId = CallId.getAndIncrement();
+    return io().sendRequestWithRetry(() -> new TransferLeadershipRequest(
+        clientId, leaderId, groupId, callId, newLeader));
+  }
 
   // TODO: change peersInNewConf to List<RaftPeer>
   @Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
index 3298431..ad43286 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
@@ -27,4 +27,7 @@ public interface RaftClientAsynchronousProtocol {
 
   CompletableFuture<RaftClientReply> setConfigurationAsync(
       SetConfigurationRequest request) throws IOException;
+
+  CompletableFuture<RaftClientReply> transferLeadershipAsync(
+      TransferLeadershipRequest request) throws IOException;
 }
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
index b3cbcc3..1ac00f3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
@@ -23,4 +23,6 @@ public interface RaftClientProtocol {
   RaftClientReply submitClientRequest(RaftClientRequest request) throws IOException;
 
   RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException;
+
+  RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException;
 }
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/TransferLeadershipRequest.java
similarity index 66%
copy from ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
copy to ratis-common/src/main/java/org/apache/ratis/protocol/TransferLeadershipRequest.java
index b3cbcc3..979ba7a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/TransferLeadershipRequest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,10 +17,16 @@
  */
 package org.apache.ratis.protocol;
 
-import java.io.IOException;
+public class TransferLeadershipRequest extends RaftClientRequest {
+  private final RaftPeerId newLeader;
 
-public interface RaftClientProtocol {
-  RaftClientReply submitClientRequest(RaftClientRequest request) throws IOException;
+  public TransferLeadershipRequest(
+      ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, RaftPeerId newLeader) {
+    super(clientId, serverId, groupId, callId, readRequestType());
+    this.newLeader = newLeader;
+  }
 
-  RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException;
+  public RaftPeerId getNewLeader() {
+    return newLeader;
+  }
 }
\ No newline at end of file
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 9aa133c..e29baa4 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -32,6 +32,7 @@ import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.proto.RaftProtos.TransferLeadershipRequestProto;
 import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
 import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
@@ -178,6 +179,13 @@ public class GrpcClientProtocolClient implements Closeable {
         .setConfiguration(request));
   }
 
+  RaftClientReplyProto transferLeadership(
+      TransferLeadershipRequestProto request) throws IOException {
+    return blockingCall(() -> blockingStub
+        .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+        .transferLeadership(request));
+  }
+
   private static RaftClientReplyProto blockingCall(
       CheckedSupplier<RaftClientReplyProto, StatusRuntimeException> supplier
       ) throws IOException {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
index 8248196..0d08661 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -27,6 +27,7 @@ import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.proto.RaftProtos.TransferLeadershipRequestProto;
 import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.JavaUtils;
@@ -153,6 +154,14 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
   }
 
   @Override
+  public void transferLeadership(TransferLeadershipRequestProto proto,
+      StreamObserver<RaftClientReplyProto> responseObserver) {
+    final TransferLeadershipRequest request = ClientProtoUtils.toTransferLeadershipRequest(proto);
+    GrpcUtil.asyncCall(responseObserver, () -> protocol.transferLeadershipAsync(request),
+        ClientProtoUtils::toRaftClientReplyProto);
+  }
+
+  @Override
   public StreamObserver<RaftClientRequestProto> ordered(StreamObserver<RaftClientReplyProto> responseObserver) {
     final OrderedRequestStreamObserver so = new OrderedRequestStreamObserver(responseObserver);
     orderedStreamObservers.putNew(so);
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index a87b0a4..a579550 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -33,6 +33,7 @@ import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.proto.RaftProtos.TransferLeadershipRequestProto;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.PeerProxyMap;
@@ -106,6 +107,10 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
       final GroupInfoRequestProto proto = ClientProtoUtils.toGroupInfoRequestProto(
           (GroupInfoRequest) request);
       return ClientProtoUtils.toGroupInfoReply(proxy.groupInfo(proto));
+    } else if (request instanceof TransferLeadershipRequest) {
+      final TransferLeadershipRequestProto proto = ClientProtoUtils.toTransferLeadershipRequestProto(
+          (TransferLeadershipRequest) request);
+      return ClientProtoUtils.toRaftClientReply(proxy.transferLeadership(proto));
     } else {
       final CompletableFuture<RaftClientReply> f = sendRequest(request, proxy);
       // TODO: timeout support
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
index e092818..dba6046 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
@@ -34,6 +34,7 @@ import org.apache.ratis.protocol.GroupManagementRequest;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
 import org.apache.ratis.thirdparty.com.google.protobuf
     .GeneratedMessageV3;
 import org.apache.ratis.thirdparty.com.google.protobuf
@@ -78,6 +79,16 @@ public class CombinedClientProtocolClientSideTranslatorPB
   }
 
   @Override
+  public RaftClientReply transferLeadership(TransferLeadershipRequest request)
+      throws IOException {
+    return handleRequest(request,
+        ClientProtoUtils::toTransferLeadershipRequestProto,
+        ClientProtoUtils::toRaftClientReply,
+        ClientOps.transferLeadership,
+        RaftProtos.RaftClientReplyProto::parseFrom);
+  }
+
+  @Override
   public RaftClientReply groupManagement(GroupManagementRequest request) throws IOException {
     return handleRequest(request,
         ClientProtoUtils::toGroupManagementRequestProto,
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java
index 09a10db..2315a3f 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java
@@ -38,6 +38,7 @@ import org.apache.ratis.proto.RaftProtos.GroupListRequestProto;
 import org.apache.ratis.proto.RaftProtos.GroupListReplyProto;
 import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto;
 import org.apache.ratis.proto.RaftProtos.GroupInfoReplyProto;
+import org.apache.ratis.proto.RaftProtos.TransferLeadershipRequestProto;
 import org.apache.ratis.thirdparty.com.google.protobuf.GeneratedMessageV3;
 
 
@@ -72,6 +73,9 @@ public class CombinedClientProtocolServerSideTranslatorPB
       case submitClientRequest:
         response = submitClientRequest(RaftClientRequestProto.parseFrom(buf));
         break;
+      case transferLeadership:
+        response = transferLeadership(TransferLeadershipRequestProto.parseFrom(buf));
+        break;
       default:
       }
     } catch(IOException ioe) {
@@ -117,4 +121,11 @@ public class CombinedClientProtocolServerSideTranslatorPB
     final GroupInfoReply reply = impl.getGroupInfo(request);
     return ClientProtoUtils.toGroupInfoReplyProto(reply);
   }
+
+  public RaftClientReplyProto transferLeadership(TransferLeadershipRequestProto proto)
+      throws IOException {
+    final TransferLeadershipRequest request = ClientProtoUtils.toTransferLeadershipRequest(proto);
+    final RaftClientReply reply = impl.transferLeadership(request);
+    return ClientProtoUtils.toRaftClientReplyProto(reply);
+  }
 }
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
index bd04dc1..e7c4442 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
@@ -52,6 +52,8 @@ public class HadoopClientRpc extends RaftClientRpcWithProxy<CombinedClientProtoc
         return proxy.getGroupList((GroupListRequest) request);
       } else if (request instanceof GroupInfoRequest) {
         return proxy.getGroupInfo((GroupInfoRequest) request);
+      } else if (request instanceof TransferLeadershipRequest) {
+        return proxy.transferLeadership((TransferLeadershipRequest) request);
       } else {
         return proxy.submitClientRequest(request);
       }
diff --git a/ratis-hadoop/src/main/proto/HadoopCompatability.proto b/ratis-hadoop/src/main/proto/HadoopCompatability.proto
index 2d9462d..9fec7c2 100644
--- a/ratis-hadoop/src/main/proto/HadoopCompatability.proto
+++ b/ratis-hadoop/src/main/proto/HadoopCompatability.proto
@@ -52,6 +52,7 @@ enum ClientOps {
     groupManagement = 3;
     groupList = 4;
     groupInfo = 5;
+    transferLeadership = 6;
 }
 
 message ClientRequestProto {
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
index f7655f6..12ef0c1 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
@@ -63,6 +63,11 @@ public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
           (GroupInfoRequest)request);
       b.setGroupInfoRequest(proto);
       rpcRequest = proto.getRpcRequest();
+    } else if (request instanceof TransferLeadershipRequest) {
+      final RaftProtos.TransferLeadershipRequestProto proto = ClientProtoUtils.toTransferLeadershipRequestProto(
+          (TransferLeadershipRequest)request);
+      b.setTransferLeadershipRequest(proto);
+      rpcRequest = proto.getRpcRequest();
     } else {
       final RaftClientRequestProto proto = ClientProtoUtils.toRaftClientRequestProto(request);
       b.setRaftClientRequest(proto);
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 8b9729f..7d96579 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -175,6 +175,15 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy,
               .setRequestVoteReply(reply)
               .build();
 
+        case TRANSFERLEADERSHIPREQUEST:
+          final TransferLeadershipRequestProto transferLeadershipRequest = proto.getTransferLeadershipRequest();
+          rpcRequest = transferLeadershipRequest.getRpcRequest();
+          final RaftClientReply transferLeadershipReply = server.transferLeadership(
+              ClientProtoUtils.toTransferLeadershipRequest(transferLeadershipRequest));
+          return RaftNettyServerReplyProto.newBuilder()
+              .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(transferLeadershipReply))
+              .build();
+
         case APPENDENTRIESREQUEST:
           final AppendEntriesRequestProto appendEntriesRequest = proto.getAppendEntriesRequest();
           rpcRequest = appendEntriesRequest.getServerRequest();
diff --git a/ratis-proto/src/main/proto/Grpc.proto b/ratis-proto/src/main/proto/Grpc.proto
index 497e3fd..bd97961 100644
--- a/ratis-proto/src/main/proto/Grpc.proto
+++ b/ratis-proto/src/main/proto/Grpc.proto
@@ -28,6 +28,9 @@ service RaftClientProtocolService {
   rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
       returns(ratis.common.RaftClientReplyProto) {}
 
+  rpc transferLeadership(ratis.common.TransferLeadershipRequestProto)
+      returns(ratis.common.RaftClientReplyProto) {}
+      
   // A client-to-server stream RPC to ordered async requests
   rpc ordered(stream ratis.common.RaftClientRequestProto)
       returns (stream ratis.common.RaftClientReplyProto) {}
diff --git a/ratis-proto/src/main/proto/Hadoop.proto b/ratis-proto/src/main/proto/Hadoop.proto
index 5d81b6c..a9a2b12 100644
--- a/ratis-proto/src/main/proto/Hadoop.proto
+++ b/ratis-proto/src/main/proto/Hadoop.proto
@@ -39,6 +39,9 @@ service CombinedClientProtocolService {
 
   rpc groupInfo(ratis.common.GroupInfoRequestProto)
       returns(ratis.common.GroupInfoReplyProto);
+
+  rpc transferLeadership(ratis.common.TransferLeadershipRequestProto)
+      returns(ratis.common.RaftClientReplyProto);
 }
 
 service RaftServerProtocolService {
diff --git a/ratis-proto/src/main/proto/Netty.proto b/ratis-proto/src/main/proto/Netty.proto
index 61d9b28..17155a4 100644
--- a/ratis-proto/src/main/proto/Netty.proto
+++ b/ratis-proto/src/main/proto/Netty.proto
@@ -38,6 +38,7 @@ message RaftNettyServerRequestProto {
     ratis.common.GroupManagementRequestProto groupManagementRequest = 6;
     ratis.common.GroupListRequestProto groupListRequest = 7;
     ratis.common.GroupInfoRequestProto groupInfoRequest = 8;
+    ratis.common.TransferLeadershipRequestProto transferLeadershipRequest = 9;
   }
 }
 
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index fdb186e..d06431a 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -384,6 +384,12 @@ message SetConfigurationRequestProto {
   repeated RaftPeerProto peers = 2;
 }
 
+// transfer leadership request
+message TransferLeadershipRequestProto {
+  RaftRpcRequestProto rpcRequest = 1;
+  RaftPeerProto newLeader = 2;
+}
+
 // A request to add a new group
 message GroupAddRequestProto {
   RaftGroupProto group = 1; // the group to be added.
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 6bc6499..0bc5601 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -865,6 +865,19 @@ class RaftServerImpl implements RaftServer.Division,
   }
 
   @Override
+  public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
+    //TODO(runzhiwang): implement transfer leadership in server
+    return null;
+  }
+
+  @Override
+  public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request)
+      throws IOException {
+    //TODO(runzhiwang): implement transfer leadership in server
+    return null;
+  }
+
+    @Override
   public RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException {
     return waitForReply(request, setConfigurationAsync(request));
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 80bfa6d..6e4c302 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -426,6 +426,12 @@ class RaftServerProxy implements RaftServer {
   }
 
   @Override
+  public RaftClientReply transferLeadership(TransferLeadershipRequest request)
+      throws IOException {
+    return getImpl(request.getRaftGroupId()).transferLeadership(request);
+  }
+
+  @Override
   public RaftClientReply groupManagement(GroupManagementRequest request) throws IOException {
     return RaftServerImpl.waitForReply(getId(), request, groupManagementAsync(request),
         e -> RaftClientReply.newBuilder()
@@ -529,6 +535,11 @@ class RaftServerProxy implements RaftServer {
   }
 
   @Override
+  public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request) {
+    return submitRequest(request.getRaftGroupId(), impl -> impl.transferLeadershipAsync(request));
+  }
+
+  @Override
   public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException {
     return getImpl(request.getServerRequest()).requestVote(request);
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index 85b1d83..e11c339 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -31,6 +31,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.util.Daemon;
@@ -178,6 +179,8 @@ class SimulatedServerRpc implements RaftServerRpc {
             server.getGroupInfo((GroupInfoRequest) request));
       } else if (request instanceof SetConfigurationRequest) {
         future = server.setConfigurationAsync((SetConfigurationRequest) request);
+      } else if (request instanceof TransferLeadershipRequest) {
+        future = server.transferLeadershipAsync((TransferLeadershipRequest) request);
       } else {
         future = server.submitClientRequestAsync(request);
       }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 90d2690..e1b1b1f 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.datastream;
 
 import org.apache.ratis.BaseTest;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
 import org.apache.ratis.server.DataStreamServer;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.DivisionInfo;
@@ -277,6 +278,11 @@ abstract class DataStreamBaseTest extends BaseTest {
       }
 
       @Override
+      public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
+        return null;
+      }
+
+      @Override
       public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) {
         final MyDivision d = getDivision(request.getRaftGroupId());
         return d.getDataStreamMap()
@@ -301,6 +307,12 @@ abstract class DataStreamBaseTest extends BaseTest {
       }
 
       @Override
+      public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request)
+          throws IOException {
+        return null;
+      }
+
+      @Override
       public GroupListReply getGroupList(GroupListRequest request) {
         return null;
       }