You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/22 06:38:45 UTC
[incubator-ratis] branch master updated: RATIS-1254. Add transfer
leadership request and proto (#366)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 01b8a85 RATIS-1254. Add transfer leadership request and proto (#366)
01b8a85 is described below
commit 01b8a85d0aedf0ca555bdd83d203893140d8d212
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Tue Dec 22 14:38:39 2020 +0800
RATIS-1254. Add transfer leadership request and proto (#366)
* RATIS-1254. Add transfer leadership request and proto
* fix code review
---
.../java/org/apache/ratis/client/RaftClient.java | 3 +++
.../apache/ratis/client/impl/ClientProtoUtils.java | 20 ++++++++++++++++++++
.../org/apache/ratis/client/impl/RaftClientImpl.java | 8 ++++++++
.../protocol/RaftClientAsynchronousProtocol.java | 3 +++
.../apache/ratis/protocol/RaftClientProtocol.java | 2 ++
...tProtocol.java => TransferLeadershipRequest.java} | 16 +++++++++++-----
.../ratis/grpc/client/GrpcClientProtocolClient.java | 8 ++++++++
.../ratis/grpc/client/GrpcClientProtocolService.java | 9 +++++++++
.../org/apache/ratis/grpc/client/GrpcClientRpc.java | 5 +++++
...CombinedClientProtocolClientSideTranslatorPB.java | 11 +++++++++++
...CombinedClientProtocolServerSideTranslatorPB.java | 11 +++++++++++
.../ratis/hadooprpc/client/HadoopClientRpc.java | 2 ++
.../src/main/proto/HadoopCompatability.proto | 1 +
.../apache/ratis/netty/client/NettyClientRpc.java | 5 +++++
.../apache/ratis/netty/server/NettyRpcService.java | 9 +++++++++
ratis-proto/src/main/proto/Grpc.proto | 3 +++
ratis-proto/src/main/proto/Hadoop.proto | 3 +++
ratis-proto/src/main/proto/Netty.proto | 1 +
ratis-proto/src/main/proto/Raft.proto | 6 ++++++
.../org/apache/ratis/server/impl/RaftServerImpl.java | 13 +++++++++++++
.../apache/ratis/server/impl/RaftServerProxy.java | 11 +++++++++++
.../ratis/server/simulation/SimulatedServerRpc.java | 3 +++
.../apache/ratis/datastream/DataStreamBaseTest.java | 12 ++++++++++++
23 files changed, 160 insertions(+), 5 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index b6486ae..0adf35b 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -68,6 +68,9 @@ public interface RaftClient extends Closeable {
/** Send set configuration request to the raft service. */
RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException;
+ /** Transfer leadership to the given server.*/
+ RaftClientReply transferLeadership(RaftGroupId group, RaftPeerId newLeader) throws IOException;
+
/** @return a {@link Builder}. */
static Builder newBuilder() {
return new Builder();
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 2342ba3..50af252 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -411,6 +411,26 @@ public interface ClientProtoUtils {
.build();
}
+ static TransferLeadershipRequest toTransferLeadershipRequest(
+ TransferLeadershipRequestProto p) {
+ final RaftRpcRequestProto m = p.getRpcRequest();
+ final RaftPeer newLeader = ProtoUtils.toRaftPeer(p.getNewLeader());
+ return new TransferLeadershipRequest(
+ ClientId.valueOf(m.getRequestorId()),
+ RaftPeerId.valueOf(m.getReplyId()),
+ ProtoUtils.toRaftGroupId(m.getRaftGroupId()),
+ p.getRpcRequest().getCallId(),
+ newLeader.getId());
+ }
+
+ static TransferLeadershipRequestProto toTransferLeadershipRequestProto(
+ TransferLeadershipRequest request) {
+ return TransferLeadershipRequestProto.newBuilder()
+ .setRpcRequest(toRaftRpcRequestProtoBuilder(request))
+ .setNewLeader(RaftPeer.newBuilder().setId(request.getNewLeader()).build().getRaftPeerProto())
+ .build();
+ }
+
static GroupManagementRequest toGroupManagementRequest(GroupManagementRequestProto p) {
final RaftRpcRequestProto m = p.getRpcRequest();
final ClientId clientId = ClientId.valueOf(m.getRequestorId());
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 0c70c88..d5437c7 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -35,6 +35,7 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.RaftException;
@@ -212,6 +213,13 @@ public final class RaftClientImpl implements RaftClient {
callId, message, type, slidingWindowEntry);
}
+ @Override
+ public RaftClientReply transferLeadership(RaftGroupId raftGroupId, RaftPeerId newLeader) throws IOException {
+ Objects.requireNonNull(newLeader, "newLeader == null");
+ final long callId = CallId.getAndIncrement();
+ return io().sendRequestWithRetry(() -> new TransferLeadershipRequest(
+ clientId, leaderId, groupId, callId, newLeader));
+ }
// TODO: change peersInNewConf to List<RaftPeer>
@Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
index 3298431..ad43286 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
@@ -27,4 +27,7 @@ public interface RaftClientAsynchronousProtocol {
CompletableFuture<RaftClientReply> setConfigurationAsync(
SetConfigurationRequest request) throws IOException;
+
+ CompletableFuture<RaftClientReply> transferLeadershipAsync(
+ TransferLeadershipRequest request) throws IOException;
}
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
index b3cbcc3..1ac00f3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
@@ -23,4 +23,6 @@ public interface RaftClientProtocol {
RaftClientReply submitClientRequest(RaftClientRequest request) throws IOException;
RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException;
+
+ RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException;
}
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/TransferLeadershipRequest.java
similarity index 66%
copy from ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
copy to ratis-common/src/main/java/org/apache/ratis/protocol/TransferLeadershipRequest.java
index b3cbcc3..979ba7a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/TransferLeadershipRequest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,10 +17,16 @@
*/
package org.apache.ratis.protocol;
-import java.io.IOException;
+public class TransferLeadershipRequest extends RaftClientRequest {
+ private final RaftPeerId newLeader;
-public interface RaftClientProtocol {
- RaftClientReply submitClientRequest(RaftClientRequest request) throws IOException;
+ public TransferLeadershipRequest(
+ ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, RaftPeerId newLeader) {
+ super(clientId, serverId, groupId, callId, readRequestType());
+ this.newLeader = newLeader;
+ }
- RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException;
+ public RaftPeerId getNewLeader() {
+ return newLeader;
+ }
}
\ No newline at end of file
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 9aa133c..e29baa4 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -32,6 +32,7 @@ import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.proto.RaftProtos.TransferLeadershipRequestProto;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
@@ -178,6 +179,13 @@ public class GrpcClientProtocolClient implements Closeable {
.setConfiguration(request));
}
+ RaftClientReplyProto transferLeadership(
+ TransferLeadershipRequestProto request) throws IOException {
+ return blockingCall(() -> blockingStub
+ .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+ .transferLeadership(request));
+ }
+
private static RaftClientReplyProto blockingCall(
CheckedSupplier<RaftClientReplyProto, StatusRuntimeException> supplier
) throws IOException {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
index 8248196..0d08661 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -27,6 +27,7 @@ import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.proto.RaftProtos.TransferLeadershipRequestProto;
import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
@@ -153,6 +154,14 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
}
@Override
+ public void transferLeadership(TransferLeadershipRequestProto proto,
+ StreamObserver<RaftClientReplyProto> responseObserver) {
+ final TransferLeadershipRequest request = ClientProtoUtils.toTransferLeadershipRequest(proto);
+ GrpcUtil.asyncCall(responseObserver, () -> protocol.transferLeadershipAsync(request),
+ ClientProtoUtils::toRaftClientReplyProto);
+ }
+
+ @Override
public StreamObserver<RaftClientRequestProto> ordered(StreamObserver<RaftClientReplyProto> responseObserver) {
final OrderedRequestStreamObserver so = new OrderedRequestStreamObserver(responseObserver);
orderedStreamObservers.putNew(so);
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index a87b0a4..a579550 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -33,6 +33,7 @@ import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.proto.RaftProtos.TransferLeadershipRequestProto;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.PeerProxyMap;
@@ -106,6 +107,10 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
final GroupInfoRequestProto proto = ClientProtoUtils.toGroupInfoRequestProto(
(GroupInfoRequest) request);
return ClientProtoUtils.toGroupInfoReply(proxy.groupInfo(proto));
+ } else if (request instanceof TransferLeadershipRequest) {
+ final TransferLeadershipRequestProto proto = ClientProtoUtils.toTransferLeadershipRequestProto(
+ (TransferLeadershipRequest) request);
+ return ClientProtoUtils.toRaftClientReply(proxy.transferLeadership(proto));
} else {
final CompletableFuture<RaftClientReply> f = sendRequest(request, proxy);
// TODO: timeout support
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
index e092818..dba6046 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
@@ -34,6 +34,7 @@ import org.apache.ratis.protocol.GroupManagementRequest;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
import org.apache.ratis.thirdparty.com.google.protobuf
.GeneratedMessageV3;
import org.apache.ratis.thirdparty.com.google.protobuf
@@ -78,6 +79,16 @@ public class CombinedClientProtocolClientSideTranslatorPB
}
@Override
+ public RaftClientReply transferLeadership(TransferLeadershipRequest request)
+ throws IOException {
+ return handleRequest(request,
+ ClientProtoUtils::toTransferLeadershipRequestProto,
+ ClientProtoUtils::toRaftClientReply,
+ ClientOps.transferLeadership,
+ RaftProtos.RaftClientReplyProto::parseFrom);
+ }
+
+ @Override
public RaftClientReply groupManagement(GroupManagementRequest request) throws IOException {
return handleRequest(request,
ClientProtoUtils::toGroupManagementRequestProto,
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java
index 09a10db..2315a3f 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java
@@ -38,6 +38,7 @@ import org.apache.ratis.proto.RaftProtos.GroupListRequestProto;
import org.apache.ratis.proto.RaftProtos.GroupListReplyProto;
import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto;
import org.apache.ratis.proto.RaftProtos.GroupInfoReplyProto;
+import org.apache.ratis.proto.RaftProtos.TransferLeadershipRequestProto;
import org.apache.ratis.thirdparty.com.google.protobuf.GeneratedMessageV3;
@@ -72,6 +73,9 @@ public class CombinedClientProtocolServerSideTranslatorPB
case submitClientRequest:
response = submitClientRequest(RaftClientRequestProto.parseFrom(buf));
break;
+ case transferLeadership:
+ response = transferLeadership(TransferLeadershipRequestProto.parseFrom(buf));
+ break;
default:
}
} catch(IOException ioe) {
@@ -117,4 +121,11 @@ public class CombinedClientProtocolServerSideTranslatorPB
final GroupInfoReply reply = impl.getGroupInfo(request);
return ClientProtoUtils.toGroupInfoReplyProto(reply);
}
+
+ public RaftClientReplyProto transferLeadership(TransferLeadershipRequestProto proto)
+ throws IOException {
+ final TransferLeadershipRequest request = ClientProtoUtils.toTransferLeadershipRequest(proto);
+ final RaftClientReply reply = impl.transferLeadership(request);
+ return ClientProtoUtils.toRaftClientReplyProto(reply);
+ }
}
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
index bd04dc1..e7c4442 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
@@ -52,6 +52,8 @@ public class HadoopClientRpc extends RaftClientRpcWithProxy<CombinedClientProtoc
return proxy.getGroupList((GroupListRequest) request);
} else if (request instanceof GroupInfoRequest) {
return proxy.getGroupInfo((GroupInfoRequest) request);
+ } else if (request instanceof TransferLeadershipRequest) {
+ return proxy.transferLeadership((TransferLeadershipRequest) request);
} else {
return proxy.submitClientRequest(request);
}
diff --git a/ratis-hadoop/src/main/proto/HadoopCompatability.proto b/ratis-hadoop/src/main/proto/HadoopCompatability.proto
index 2d9462d..9fec7c2 100644
--- a/ratis-hadoop/src/main/proto/HadoopCompatability.proto
+++ b/ratis-hadoop/src/main/proto/HadoopCompatability.proto
@@ -52,6 +52,7 @@ enum ClientOps {
groupManagement = 3;
groupList = 4;
groupInfo = 5;
+ transferLeadership = 6;
}
message ClientRequestProto {
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
index f7655f6..12ef0c1 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
@@ -63,6 +63,11 @@ public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
(GroupInfoRequest)request);
b.setGroupInfoRequest(proto);
rpcRequest = proto.getRpcRequest();
+ } else if (request instanceof TransferLeadershipRequest) {
+ final RaftProtos.TransferLeadershipRequestProto proto = ClientProtoUtils.toTransferLeadershipRequestProto(
+ (TransferLeadershipRequest)request);
+ b.setTransferLeadershipRequest(proto);
+ rpcRequest = proto.getRpcRequest();
} else {
final RaftClientRequestProto proto = ClientProtoUtils.toRaftClientRequestProto(request);
b.setRaftClientRequest(proto);
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 8b9729f..7d96579 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -175,6 +175,15 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy,
.setRequestVoteReply(reply)
.build();
+ case TRANSFERLEADERSHIPREQUEST:
+ final TransferLeadershipRequestProto transferLeadershipRequest = proto.getTransferLeadershipRequest();
+ rpcRequest = transferLeadershipRequest.getRpcRequest();
+ final RaftClientReply transferLeadershipReply = server.transferLeadership(
+ ClientProtoUtils.toTransferLeadershipRequest(transferLeadershipRequest));
+ return RaftNettyServerReplyProto.newBuilder()
+ .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(transferLeadershipReply))
+ .build();
+
case APPENDENTRIESREQUEST:
final AppendEntriesRequestProto appendEntriesRequest = proto.getAppendEntriesRequest();
rpcRequest = appendEntriesRequest.getServerRequest();
diff --git a/ratis-proto/src/main/proto/Grpc.proto b/ratis-proto/src/main/proto/Grpc.proto
index 497e3fd..bd97961 100644
--- a/ratis-proto/src/main/proto/Grpc.proto
+++ b/ratis-proto/src/main/proto/Grpc.proto
@@ -28,6 +28,9 @@ service RaftClientProtocolService {
rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
returns(ratis.common.RaftClientReplyProto) {}
+ rpc transferLeadership(ratis.common.TransferLeadershipRequestProto)
+ returns(ratis.common.RaftClientReplyProto) {}
+
// A client-to-server stream RPC to ordered async requests
rpc ordered(stream ratis.common.RaftClientRequestProto)
returns (stream ratis.common.RaftClientReplyProto) {}
diff --git a/ratis-proto/src/main/proto/Hadoop.proto b/ratis-proto/src/main/proto/Hadoop.proto
index 5d81b6c..a9a2b12 100644
--- a/ratis-proto/src/main/proto/Hadoop.proto
+++ b/ratis-proto/src/main/proto/Hadoop.proto
@@ -39,6 +39,9 @@ service CombinedClientProtocolService {
rpc groupInfo(ratis.common.GroupInfoRequestProto)
returns(ratis.common.GroupInfoReplyProto);
+
+ rpc transferLeadership(ratis.common.TransferLeadershipRequestProto)
+ returns(ratis.common.RaftClientReplyProto);
}
service RaftServerProtocolService {
diff --git a/ratis-proto/src/main/proto/Netty.proto b/ratis-proto/src/main/proto/Netty.proto
index 61d9b28..17155a4 100644
--- a/ratis-proto/src/main/proto/Netty.proto
+++ b/ratis-proto/src/main/proto/Netty.proto
@@ -38,6 +38,7 @@ message RaftNettyServerRequestProto {
ratis.common.GroupManagementRequestProto groupManagementRequest = 6;
ratis.common.GroupListRequestProto groupListRequest = 7;
ratis.common.GroupInfoRequestProto groupInfoRequest = 8;
+ ratis.common.TransferLeadershipRequestProto transferLeadershipRequest = 9;
}
}
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index fdb186e..d06431a 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -384,6 +384,12 @@ message SetConfigurationRequestProto {
repeated RaftPeerProto peers = 2;
}
+// transfer leadership request
+message TransferLeadershipRequestProto {
+ RaftRpcRequestProto rpcRequest = 1;
+ RaftPeerProto newLeader = 2;
+}
+
// A request to add a new group
message GroupAddRequestProto {
RaftGroupProto group = 1; // the group to be added.
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 6bc6499..0bc5601 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -865,6 +865,19 @@ class RaftServerImpl implements RaftServer.Division,
}
@Override
+ public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
+ //TODO(runzhiwang): implement transfer leadership in server
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request)
+ throws IOException {
+ //TODO(runzhiwang): implement transfer leadership in server
+ return null;
+ }
+
+ @Override
public RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException {
return waitForReply(request, setConfigurationAsync(request));
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 80bfa6d..6e4c302 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -426,6 +426,12 @@ class RaftServerProxy implements RaftServer {
}
@Override
+ public RaftClientReply transferLeadership(TransferLeadershipRequest request)
+ throws IOException {
+ return getImpl(request.getRaftGroupId()).transferLeadership(request);
+ }
+
+ @Override
public RaftClientReply groupManagement(GroupManagementRequest request) throws IOException {
return RaftServerImpl.waitForReply(getId(), request, groupManagementAsync(request),
e -> RaftClientReply.newBuilder()
@@ -529,6 +535,11 @@ class RaftServerProxy implements RaftServer {
}
@Override
+ public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request) {
+ return submitRequest(request.getRaftGroupId(), impl -> impl.transferLeadershipAsync(request));
+ }
+
+ @Override
public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException {
return getImpl(request.getServerRequest()).requestVote(request);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index 85b1d83..e11c339 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -31,6 +31,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.util.Daemon;
@@ -178,6 +179,8 @@ class SimulatedServerRpc implements RaftServerRpc {
server.getGroupInfo((GroupInfoRequest) request));
} else if (request instanceof SetConfigurationRequest) {
future = server.setConfigurationAsync((SetConfigurationRequest) request);
+ } else if (request instanceof TransferLeadershipRequest) {
+ future = server.transferLeadershipAsync((TransferLeadershipRequest) request);
} else {
future = server.submitClientRequestAsync(request);
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 90d2690..e1b1b1f 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -18,6 +18,7 @@
package org.apache.ratis.datastream;
import org.apache.ratis.BaseTest;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
import org.apache.ratis.server.DataStreamServer;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.DivisionInfo;
@@ -277,6 +278,11 @@ abstract class DataStreamBaseTest extends BaseTest {
}
@Override
+ public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
+ return null;
+ }
+
+ @Override
public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) {
final MyDivision d = getDivision(request.getRaftGroupId());
return d.getDataStreamMap()
@@ -301,6 +307,12 @@ abstract class DataStreamBaseTest extends BaseTest {
}
@Override
+ public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request)
+ throws IOException {
+ return null;
+ }
+
+ @Override
public GroupListReply getGroupList(GroupListRequest request) {
return null;
}