You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/02/04 12:02:38 UTC
[ratis] branch master updated: RATIS-1511. Add setLeaderElection grpc and client related code (#594)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d41d093 RATIS-1511. Add setLeaderElection grpc and client related code (#594)
d41d093 is described below
commit d41d0930c49c4cfcd8cd709519329e22fbbe61b4
Author: Yaolong Liu <ly...@163.com>
AuthorDate: Fri Feb 4 20:02:06 2022 +0800
RATIS-1511. Add setLeaderElection grpc and client related code (#594)
---
.../java/org/apache/ratis/client/RaftClient.java | 4 ++
.../client/api/LeaderElectionManagementApi.java | 27 +++++------
.../apache/ratis/client/impl/ClientProtoUtils.java | 31 +++++++++++++
.../client/impl/LeaderElectionManagementImpl.java | 52 ++++++++++++++++++++++
.../apache/ratis/client/impl/RaftClientImpl.java | 8 ++++
.../ratis/protocol/AdminAsynchronousProtocol.java | 2 +
.../org/apache/ratis/protocol/AdminProtocol.java | 2 +
...t.java => LeaderElectionManagementRequest.java} | 24 +++++-----
.../grpc/client/GrpcClientProtocolClient.java | 8 ++++
.../apache/ratis/grpc/client/GrpcClientRpc.java | 7 ++-
.../grpc/server/GrpcAdminProtocolService.java | 9 ++++
.../apache/ratis/netty/client/NettyClientRpc.java | 6 +++
.../apache/ratis/netty/server/NettyRpcService.java | 10 +++++
ratis-proto/src/main/proto/Grpc.proto | 3 ++
ratis-proto/src/main/proto/Netty.proto | 1 +
ratis-proto/src/main/proto/Raft.proto | 18 ++++++++
.../apache/ratis/server/impl/RaftServerImpl.java | 9 ++--
.../apache/ratis/server/impl/RaftServerProxy.java | 15 ++++++-
.../ratis/server/impl/LeaderElectionTests.java | 10 +----
.../server/simulation/SimulatedServerRpc.java | 3 ++
20 files changed, 209 insertions(+), 40 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 30e2879..6529732 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -23,6 +23,7 @@ import org.apache.ratis.client.api.AsyncApi;
import org.apache.ratis.client.api.BlockingApi;
import org.apache.ratis.client.api.DataStreamApi;
import org.apache.ratis.client.api.GroupManagementApi;
+import org.apache.ratis.client.api.LeaderElectionManagementApi;
import org.apache.ratis.client.api.MessageStreamApi;
import org.apache.ratis.client.api.SnapshotManagementApi;
import org.apache.ratis.client.impl.ClientImplUtils;
@@ -64,6 +65,9 @@ public interface RaftClient extends Closeable {
/** Get the {@link SnapshotManagementApi} for the given server. */
SnapshotManagementApi getSnapshotManagementApi(RaftPeerId server);
+ /** Get the {@link LeaderElectionManagementApi} for the given server. */
+ LeaderElectionManagementApi getLeaderElectionManagementApi(RaftPeerId server);
+
/** @return the {@link BlockingApi}. */
BlockingApi io();
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java b/ratis-client/src/main/java/org/apache/ratis/client/api/LeaderElectionManagementApi.java
similarity index 57%
copy from ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
copy to ratis-client/src/main/java/org/apache/ratis/client/api/LeaderElectionManagementApi.java
index 849466a..809fcdb 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/LeaderElectionManagementApi.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,21 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.protocol;
-
-import java.io.IOException;
+package org.apache.ratis.client.api;
-/** For server administration. */
-public interface AdminProtocol {
- GroupListReply getGroupList(GroupListRequest request) throws IOException;
+import org.apache.ratis.protocol.RaftClientReply;
- GroupInfoReply getGroupInfo(GroupInfoRequest request) throws IOException;
+import java.io.IOException;
- RaftClientReply groupManagement(GroupManagementRequest request) throws IOException;
+/**
+ * An API to support control leader election
+ * such as pause and resume election
+ */
+public interface LeaderElectionManagementApi {
- RaftClientReply snapshotManagement(SnapshotManagementRequest request) throws IOException;
+ /** pause leader election. */
+ RaftClientReply pause() throws IOException;
- RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException;
+ /** resume leader election. */
+ RaftClientReply resume() throws IOException;
- RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException;
-}
\ No newline at end of file
+}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 9478c89..7fc96f2 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -619,6 +619,37 @@ public interface ClientProtoUtils {
return b.build();
}
+ static LeaderElectionManagementRequest toLeaderElectionManagementRequest(LeaderElectionManagementRequestProto p) {
+ final RaftRpcRequestProto m = p.getRpcRequest();
+ final ClientId clientId = ClientId.valueOf(m.getRequestorId());
+ final RaftPeerId serverId = RaftPeerId.valueOf(m.getReplyId());
+ switch(p.getOpCase()) {
+ case PAUSE:
+ return LeaderElectionManagementRequest.newPause(clientId, serverId,
+ ProtoUtils.toRaftGroupId(m.getRaftGroupId()), m.getCallId());
+ case RESUME:
+ return LeaderElectionManagementRequest.newResume(clientId, serverId,
+ ProtoUtils.toRaftGroupId(m.getRaftGroupId()), m.getCallId());
+ default:
+ throw new IllegalArgumentException("Unexpected op " + p.getOpCase() + " in " + p);
+ }
+ }
+
+ static LeaderElectionManagementRequestProto toLeaderElectionManagementRequestProto(
+ LeaderElectionManagementRequest request) {
+ final LeaderElectionManagementRequestProto.Builder b = LeaderElectionManagementRequestProto.newBuilder()
+ .setRpcRequest(toRaftRpcRequestProtoBuilder(request));
+ final LeaderElectionManagementRequest.Pause pause = request.getPause();
+ if (pause != null) {
+ b.setPause(LeaderElectionPauseRequestProto.newBuilder().build());
+ }
+ final LeaderElectionManagementRequest.Resume resume = request.getResume();
+ if (resume != null) {
+ b.setResume(LeaderElectionResumeRequestProto.newBuilder().build());
+ }
+ return b.build();
+ }
+
static GroupInfoRequestProto toGroupInfoRequestProto(
GroupInfoRequest request) {
return GroupInfoRequestProto.newBuilder()
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/LeaderElectionManagementImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/LeaderElectionManagementImpl.java
new file mode 100644
index 0000000..76cea63
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/LeaderElectionManagementImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.client.impl;
+
+import org.apache.ratis.client.api.LeaderElectionManagementApi;
+import org.apache.ratis.protocol.LeaderElectionManagementRequest;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.CallId;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public class LeaderElectionManagementImpl implements LeaderElectionManagementApi {
+
+ private final RaftClientImpl client;
+ private final RaftPeerId server;
+
+ LeaderElectionManagementImpl(RaftPeerId server, RaftClientImpl client) {
+ this.server = Objects.requireNonNull(server, "server == null");
+ this.client = Objects.requireNonNull(client, "client == null");
+ }
+ @Override
+ public RaftClientReply pause() throws IOException {
+ final long callId = CallId.getAndIncrement();
+ return client.io().sendRequestWithRetry(() -> LeaderElectionManagementRequest.newPause(client.getId(),
+ server, client.getGroupId(), callId));
+ }
+
+ @Override
+ public RaftClientReply resume() throws IOException {
+ final long callId = CallId.getAndIncrement();
+ return client.io().sendRequestWithRetry(() -> LeaderElectionManagementRequest.newResume(client.getId(),
+ server, client.getGroupId(), callId));
+ }
+}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 9ccb256..f937880 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -21,6 +21,7 @@ import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.client.api.DataStreamApi;
+import org.apache.ratis.client.api.LeaderElectionManagementApi;
import org.apache.ratis.client.api.SnapshotManagementApi;
import org.apache.ratis.client.retry.ClientRetryEvent;
import org.apache.ratis.conf.RaftProperties;
@@ -142,6 +143,8 @@ public final class RaftClientImpl implements RaftClient {
private final Supplier<AdminImpl> adminApi;
private final ConcurrentMap<RaftPeerId, GroupManagementImpl> groupManagmenets = new ConcurrentHashMap<>();
private final ConcurrentMap<RaftPeerId, SnapshotManagementApi> snapshotManagemenet = new ConcurrentHashMap<>();
+ private final ConcurrentMap<RaftPeerId, LeaderElectionManagementApi>
+ leaderElectionManagement = new ConcurrentHashMap<>();
RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, RaftPeer primaryDataStreamServer,
RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy retryPolicy) {
@@ -256,6 +259,11 @@ public final class RaftClientImpl implements RaftClient {
}
@Override
+ public LeaderElectionManagementApi getLeaderElectionManagementApi(RaftPeerId server) {
+ return leaderElectionManagement.computeIfAbsent(server, id -> new LeaderElectionManagementImpl(id, this));
+ }
+
+ @Override
public BlockingImpl io() {
return blockingApi.get();
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
index 9c7d62c..38f72bc 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
@@ -30,6 +30,8 @@ public interface AdminAsynchronousProtocol {
CompletableFuture<RaftClientReply> snapshotManagementAsync(SnapshotManagementRequest request);
+ CompletableFuture<RaftClientReply> leaderElectionManagementAsync(LeaderElectionManagementRequest request);
+
CompletableFuture<RaftClientReply> setConfigurationAsync(
SetConfigurationRequest request) throws IOException;
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
index 849466a..e66b296 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
@@ -29,6 +29,8 @@ public interface AdminProtocol {
RaftClientReply snapshotManagement(SnapshotManagementRequest request) throws IOException;
+ RaftClientReply leaderElectionManagement(LeaderElectionManagementRequest request) throws IOException;
+
RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException;
RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException;
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderElectionRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderElectionManagementRequest.java
similarity index 69%
rename from ratis-common/src/main/java/org/apache/ratis/protocol/LeaderElectionRequest.java
rename to ratis-common/src/main/java/org/apache/ratis/protocol/LeaderElectionManagementRequest.java
index bce0178..6ca6f50 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderElectionRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderElectionManagementRequest.java
@@ -19,7 +19,7 @@ package org.apache.ratis.protocol;
import org.apache.ratis.util.JavaUtils;
-public final class LeaderElectionRequest extends RaftClientRequest{
+public final class LeaderElectionManagementRequest extends RaftClientRequest{
public abstract static class Op {
}
@@ -37,23 +37,23 @@ public final class LeaderElectionRequest extends RaftClientRequest{
}
}
- public static LeaderElectionRequest newPause(ClientId clientId,
- RaftPeerId serverId, RaftGroupId groupId, long callId, long timeoutMs) {
- return new LeaderElectionRequest(clientId,
- serverId, groupId, callId, timeoutMs,new LeaderElectionRequest.Pause());
+ public static LeaderElectionManagementRequest newPause(ClientId clientId,
+ RaftPeerId serverId, RaftGroupId groupId, long callId) {
+ return new LeaderElectionManagementRequest(clientId,
+ serverId, groupId, callId, new LeaderElectionManagementRequest.Pause());
}
- public static LeaderElectionRequest newResume(ClientId clientId,
- RaftPeerId serverId, RaftGroupId groupId, long callId, long timeoutMs) {
- return new LeaderElectionRequest(clientId,
- serverId, groupId, callId, timeoutMs,new LeaderElectionRequest.Resume());
+ public static LeaderElectionManagementRequest newResume(ClientId clientId,
+ RaftPeerId serverId, RaftGroupId groupId, long callId) {
+ return new LeaderElectionManagementRequest(clientId,
+ serverId, groupId, callId, new LeaderElectionManagementRequest.Resume());
}
private final Op op;
- public LeaderElectionRequest(
- ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, long timeoutMs, Op op) {
- super(clientId, serverId, groupId, callId, readRequestType(), timeoutMs);
+ public LeaderElectionManagementRequest(
+ ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, Op op) {
+ super(clientId, serverId, groupId, callId, false, readRequestType());
this.op = op;
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index ab68032..98b5fb4 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -34,6 +34,7 @@ import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
import org.apache.ratis.proto.RaftProtos.TransferLeadershipRequestProto;
import org.apache.ratis.proto.RaftProtos.SnapshotManagementRequestProto;
+import org.apache.ratis.proto.RaftProtos.LeaderElectionManagementRequestProto;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
@@ -217,6 +218,13 @@ public class GrpcClientProtocolClient implements Closeable {
.snapshotManagement(request));
}
+ RaftClientReplyProto leaderElectionManagement(
+ LeaderElectionManagementRequestProto request) throws IOException {
+ return blockingCall(() -> adminBlockingStub
+ .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+ .leaderElectionManagement(request));
+ }
+
private static RaftClientReplyProto blockingCall(
CheckedSupplier<RaftClientReplyProto, StatusRuntimeException> supplier
) throws IOException {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 5686a2b..8d62fdb 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -35,6 +35,7 @@ import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
import org.apache.ratis.proto.RaftProtos.TransferLeadershipRequestProto;
import org.apache.ratis.proto.RaftProtos.SnapshotManagementRequestProto;
+import org.apache.ratis.proto.RaftProtos.LeaderElectionManagementRequestProto;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.PeerProxyMap;
@@ -113,8 +114,12 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
return ClientProtoUtils.toRaftClientReply(proxy.transferLeadership(proto));
} else if (request instanceof SnapshotManagementRequest) {
final SnapshotManagementRequestProto proto = ClientProtoUtils.toSnapshotManagementRequestProto
- ((SnapshotManagementRequest)request);
+ ((SnapshotManagementRequest) request);
return ClientProtoUtils.toRaftClientReply(proxy.snapshotManagement(proto));
+ } else if (request instanceof LeaderElectionManagementRequest) {
+ final LeaderElectionManagementRequestProto proto = ClientProtoUtils.toLeaderElectionManagementRequestProto
+ ((LeaderElectionManagementRequest) request);
+ return ClientProtoUtils.toRaftClientReply(proxy.leaderElectionManagement(proto));
} else {
final CompletableFuture<RaftClientReply> f = sendRequest(request, proxy);
// TODO: timeout support
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java
index 6a4ff3f..feb7803 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java
@@ -24,6 +24,7 @@ import org.apache.ratis.protocol.AdminAsynchronousProtocol;
import org.apache.ratis.protocol.GroupInfoRequest;
import org.apache.ratis.protocol.GroupListRequest;
import org.apache.ratis.protocol.GroupManagementRequest;
+import org.apache.ratis.protocol.LeaderElectionManagementRequest;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.SnapshotManagementRequest;
import org.apache.ratis.protocol.TransferLeadershipRequest;
@@ -85,4 +86,12 @@ public class GrpcAdminProtocolService extends AdminProtocolServiceImplBase {
GrpcUtil.asyncCall(responseObserver, () -> protocol.snapshotManagementAsync(request),
ClientProtoUtils::toRaftClientReplyProto);
}
+
+ @Override
+ public void leaderElectionManagement(LeaderElectionManagementRequestProto proto,
+ StreamObserver<RaftClientReplyProto> responseObserver) {
+ final LeaderElectionManagementRequest request = ClientProtoUtils.toLeaderElectionManagementRequest(proto);
+ GrpcUtil.asyncCall(responseObserver, () -> protocol.leaderElectionManagementAsync(request),
+ ClientProtoUtils::toRaftClientReplyProto);
+ }
}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
index 7ac04a2..c816e29 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
@@ -73,6 +73,12 @@ public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
(SnapshotManagementRequest) request);
b.setSnapshotManagementRequest(proto);
rpcRequest = proto.getRpcRequest();
+ } else if (request instanceof LeaderElectionManagementRequest) {
+ final RaftProtos.LeaderElectionManagementRequestProto proto =
+ ClientProtoUtils.toLeaderElectionManagementRequestProto(
+ (LeaderElectionManagementRequest) request);
+ b.setLeaderElectionManagementRequest(proto);
+ rpcRequest = proto.getRpcRequest();
} else {
final RaftClientRequestProto proto = ClientProtoUtils.toRaftClientRequestProto(request);
b.setRaftClientRequest(proto);
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index b80649f..48a3592 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -200,6 +200,16 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy,
.setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(snapshotManagementReply))
.build();
+ case LEADERELECTIONMANAGEMENTREQUEST:
+ final LeaderElectionManagementRequestProto leaderElectionManagementRequest =
+ proto.getLeaderElectionManagementRequest();
+ rpcRequest = leaderElectionManagementRequest.getRpcRequest();
+ final RaftClientReply leaderElectionManagementReply = server.leaderElectionManagement(
+ ClientProtoUtils.toLeaderElectionManagementRequest(leaderElectionManagementRequest));
+ return RaftNettyServerReplyProto.newBuilder()
+ .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(leaderElectionManagementReply))
+ .build();
+
case APPENDENTRIESREQUEST:
final AppendEntriesRequestProto appendEntriesRequest = proto.getAppendEntriesRequest();
rpcRequest = appendEntriesRequest.getServerRequest();
diff --git a/ratis-proto/src/main/proto/Grpc.proto b/ratis-proto/src/main/proto/Grpc.proto
index 799b3ac..2e348f2 100644
--- a/ratis-proto/src/main/proto/Grpc.proto
+++ b/ratis-proto/src/main/proto/Grpc.proto
@@ -62,6 +62,9 @@ service AdminProtocolService {
rpc snapshotManagement(ratis.common.SnapshotManagementRequestProto)
returns(ratis.common.RaftClientReplyProto) {}
+ rpc leaderElectionManagement(ratis.common.LeaderElectionManagementRequestProto)
+ returns(ratis.common.RaftClientReplyProto) {}
+
rpc groupList(ratis.common.GroupListRequestProto)
returns(ratis.common.GroupListReplyProto) {}
diff --git a/ratis-proto/src/main/proto/Netty.proto b/ratis-proto/src/main/proto/Netty.proto
index ec8a0fc..6dc1343 100644
--- a/ratis-proto/src/main/proto/Netty.proto
+++ b/ratis-proto/src/main/proto/Netty.proto
@@ -41,6 +41,7 @@ message RaftNettyServerRequestProto {
ratis.common.TransferLeadershipRequestProto transferLeadershipRequest = 9;
ratis.common.StartLeaderElectionRequestProto startLeaderElectionRequest = 10;
ratis.common.SnapshotManagementRequestProto snapshotManagementRequest = 11;
+ ratis.common.LeaderElectionManagementRequestProto leaderElectionManagementRequest = 12;
}
}
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 17e7002..f49afcd 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -413,6 +413,24 @@ message TransferLeadershipRequestProto {
RaftPeerProto newLeader = 2;
}
+// leader election request
+message LeaderElectionManagementRequestProto {
+ RaftRpcRequestProto rpcRequest = 1;
+
+ oneof Op {
+ LeaderElectionPauseRequestProto pause = 2;
+ LeaderElectionResumeRequestProto resume = 3;
+ }
+}
+
+message LeaderElectionPauseRequestProto {
+
+}
+
+message LeaderElectionResumeRequestProto {
+
+}
+
// snapshot request
message SnapshotManagementRequestProto {
RaftRpcRequestProto rpcRequest = 1;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 080925d..f20d903 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1040,17 +1040,18 @@ class RaftServerImpl implements RaftServer.Division,
return snapshotRequestHandler;
}
- CompletableFuture<RaftClientReply> setLeaderElectionAsync(LeaderElectionRequest request) throws IOException {
- LOG.info("{} receive pauseLeaderElection {}", getMemberId(), request);
+ CompletableFuture<RaftClientReply> leaderElectionManagementAsync(LeaderElectionManagementRequest request)
+ throws IOException {
+ LOG.info("{} receive leaderElectionManagement request {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(request.getRequestorId(), request.getRaftGroupId());
- final LeaderElectionRequest.Pause pause = request.getPause();
+ final LeaderElectionManagementRequest.Pause pause = request.getPause();
if (pause != null) {
getRole().setLeaderElectionPause(true);
return CompletableFuture.completedFuture(newSuccessReply(request));
}
- final LeaderElectionRequest.Resume resume = request.getResume();
+ final LeaderElectionManagementRequest.Resume resume = request.getResume();
if (resume != null) {
getRole().setLeaderElectionPause(false);
return CompletableFuture.completedFuture(newSuccessReply(request));
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index d65c488..01d7100 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -546,9 +546,20 @@ class RaftServerProxy implements RaftServer {
.thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.takeSnapshotAsync(request)));
}
- public CompletableFuture<RaftClientReply> setLeaderElectionAsync(LeaderElectionRequest request) {
+ @Override
+ public RaftClientReply leaderElectionManagement(LeaderElectionManagementRequest request) throws IOException {
+ return RaftServerImpl.waitForReply(getId(), request, leaderElectionManagementAsync(request),
+ e -> RaftClientReply.newBuilder()
+ .setRequest(request)
+ .setException(e)
+ .build());
+ }
+
+ @Override
+ public CompletableFuture<RaftClientReply> leaderElectionManagementAsync(
+ LeaderElectionManagementRequest request) {
return getImplFuture(request.getRaftGroupId())
- .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.setLeaderElectionAsync(request)));
+ .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.leaderElectionManagementAsync(request)));
}
@Override
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index a5b48db..08e7d22 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -23,7 +23,6 @@ import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.RatisMetricRegistry;
-import org.apache.ratis.protocol.LeaderElectionRequest;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
@@ -31,7 +30,6 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
-import org.apache.ratis.rpc.CallId;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -424,9 +422,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
final RaftServerImpl f1 = (RaftServerImpl)followers.get(0);
try (final RaftClient client = cluster.createClient()) {
- final LeaderElectionRequest r = LeaderElectionRequest.newPause(
- client.getId(), f1.getId(),cluster.getGroupId(), CallId.getAndIncrement(), 3000);
- pauseLeaderReply = f1.getRaftServer().setLeaderElectionAsync(r).join();
+ pauseLeaderReply = client.getLeaderElectionManagementApi(f1.getId()).pause();
Assert.assertTrue(pauseLeaderReply.isSuccess());
client.io().send(new RaftTestUtil.SimpleMessage("message"));
RaftServer.Division newLeader = followers.get(0);
@@ -436,9 +432,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
Assert.assertTrue(reply.isSuccess());
JavaUtils.attempt(() -> Assert.assertEquals(leaderId, leader.getId()),
20, HUNDRED_MILLIS, "check leader id", LOG);
- final LeaderElectionRequest r1 = LeaderElectionRequest.newResume(
- client.getId(), f1.getId(),cluster.getGroupId(), CallId.getAndIncrement(), 3000);
- final RaftClientReply resumeLeaderReply = f1.getRaftServer().setLeaderElectionAsync(r1).join();
+ final RaftClientReply resumeLeaderReply = client.getLeaderElectionManagementApi(f1.getId()).resume();
Assert.assertTrue(resumeLeaderReply.isSuccess());
JavaUtils.attempt(() -> Assert.assertEquals(f1.getId(), cluster.getLeader().getId()),
20, HUNDRED_MILLIS, "check new leader", LOG);
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index 0a58826..91905d5 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -28,6 +28,7 @@ import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
import org.apache.ratis.protocol.GroupInfoRequest;
import org.apache.ratis.protocol.GroupListRequest;
import org.apache.ratis.protocol.GroupManagementRequest;
+import org.apache.ratis.protocol.LeaderElectionManagementRequest;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
@@ -195,6 +196,8 @@ class SimulatedServerRpc implements RaftServerRpc {
future = server.transferLeadershipAsync((TransferLeadershipRequest) request);
} else if (request instanceof SnapshotManagementRequest) {
future = server.snapshotManagementAsync((SnapshotManagementRequest) request);
+ } else if (request instanceof LeaderElectionManagementRequest) {
+ future = server.leaderElectionManagementAsync((LeaderElectionManagementRequest) request);
} else {
future = server.submitClientRequestAsync(request);
}