You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/01/13 12:38:41 UTC
[incubator-ratis] branch master updated: RATIS-1286. Move
setConfiguration and transferLeadership to admin GRPC calls. (#394)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 544590d RATIS-1286. Move setConfiguration and transferLeadership to admin GRPC calls. (#394)
544590d is described below
commit 544590d51b9ce9d8814958f8852a3c0d9d1ca67a
Author: Elek, Márton <el...@users.noreply.github.com>
AuthorDate: Wed Jan 13 13:38:33 2021 +0100
RATIS-1286. Move setConfiguration and transferLeadership to admin GRPC calls. (#394)
---
.../ratis/protocol/AdminAsynchronousProtocol.java | 7 +++++++
.../java/org/apache/ratis/protocol/AdminProtocol.java | 4 ++++
.../ratis/protocol/RaftClientAsynchronousProtocol.java | 5 -----
.../org/apache/ratis/protocol/RaftClientProtocol.java | 4 ----
.../ratis/grpc/client/GrpcClientProtocolClient.java | 5 ++---
.../ratis/grpc/client/GrpcClientProtocolService.java | 18 ------------------
.../ratis/grpc/server/GrpcAdminProtocolService.java | 18 ++++++++++++++++++
ratis-proto/src/main/proto/Grpc.proto | 14 +++++++-------
.../org/apache/ratis/server/impl/RaftServerImpl.java | 6 +-----
9 files changed, 39 insertions(+), 42 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
index 3e69bb8..c9b2f7e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.protocol;
+import java.io.IOException;
import java.util.concurrent.CompletableFuture;
/** Asynchronous version of {@link AdminProtocol}. */
@@ -26,4 +27,10 @@ public interface AdminAsynchronousProtocol {
CompletableFuture<GroupInfoReply> getGroupInfoAsync(GroupInfoRequest request);
CompletableFuture<RaftClientReply> groupManagementAsync(GroupManagementRequest request);
+
+ CompletableFuture<RaftClientReply> setConfigurationAsync(
+ SetConfigurationRequest request) throws IOException;
+
+ CompletableFuture<RaftClientReply> transferLeadershipAsync(
+ TransferLeadershipRequest request) throws IOException;
}
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
index ed9eed7..b10642c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
@@ -26,4 +26,8 @@ public interface AdminProtocol {
GroupInfoReply getGroupInfo(GroupInfoRequest request) throws IOException;
RaftClientReply groupManagement(GroupManagementRequest request) throws IOException;
+
+ RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException;
+
+ RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException;
}
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
index ad43286..1a9f83c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
@@ -25,9 +25,4 @@ public interface RaftClientAsynchronousProtocol {
CompletableFuture<RaftClientReply> submitClientRequestAsync(
RaftClientRequest request) throws IOException;
- CompletableFuture<RaftClientReply> setConfigurationAsync(
- SetConfigurationRequest request) throws IOException;
-
- CompletableFuture<RaftClientReply> transferLeadershipAsync(
- TransferLeadershipRequest request) throws IOException;
}
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
index 1ac00f3..44178b5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
@@ -21,8 +21,4 @@ import java.io.IOException;
public interface RaftClientProtocol {
RaftClientReply submitClientRequest(RaftClientRequest request) throws IOException;
-
- RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException;
-
- RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException;
}
\ No newline at end of file
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 1d366f9..61bee8c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -172,10 +172,9 @@ public class GrpcClientProtocolClient implements Closeable {
.groupInfo(request);
}
-
RaftClientReplyProto setConfiguration(
SetConfigurationRequestProto request) throws IOException {
- return blockingCall(() -> blockingStub
+ return blockingCall(() -> adminBlockingStub
.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
.setConfiguration(request));
}
@@ -184,7 +183,7 @@ public class GrpcClientProtocolClient implements Closeable {
TransferLeadershipRequestProto request) throws IOException {
TimeDuration newDuration = requestTimeoutDuration.add(
request.getRpcRequest().getTimeoutMs(), TimeUnit.MILLISECONDS);
- return blockingCall(() -> blockingStub
+ return blockingCall(() -> adminBlockingStub
.withDeadlineAfter(newDuration.getDuration(), newDuration.getUnit())
.transferLeadership(request));
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
index 0d08661..7cd8c08 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -26,8 +26,6 @@ import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
-import org.apache.ratis.proto.RaftProtos.TransferLeadershipRequestProto;
import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
@@ -146,22 +144,6 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
}
@Override
- public void setConfiguration(SetConfigurationRequestProto proto,
- StreamObserver<RaftClientReplyProto> responseObserver) {
- final SetConfigurationRequest request = ClientProtoUtils.toSetConfigurationRequest(proto);
- GrpcUtil.asyncCall(responseObserver, () -> protocol.setConfigurationAsync(request),
- ClientProtoUtils::toRaftClientReplyProto);
- }
-
- @Override
- public void transferLeadership(TransferLeadershipRequestProto proto,
- StreamObserver<RaftClientReplyProto> responseObserver) {
- final TransferLeadershipRequest request = ClientProtoUtils.toTransferLeadershipRequest(proto);
- GrpcUtil.asyncCall(responseObserver, () -> protocol.transferLeadershipAsync(request),
- ClientProtoUtils::toRaftClientReplyProto);
- }
-
- @Override
public StreamObserver<RaftClientRequestProto> ordered(StreamObserver<RaftClientReplyProto> responseObserver) {
final OrderedRequestStreamObserver so = new OrderedRequestStreamObserver(responseObserver);
orderedStreamObservers.putNew(so);
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java
index 199e38f..1156aeb 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java
@@ -24,6 +24,8 @@ import org.apache.ratis.protocol.AdminAsynchronousProtocol;
import org.apache.ratis.protocol.GroupInfoRequest;
import org.apache.ratis.protocol.GroupListRequest;
import org.apache.ratis.protocol.GroupManagementRequest;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
@@ -58,4 +60,20 @@ public class GrpcAdminProtocolService extends AdminProtocolServiceImplBase {
GrpcUtil.asyncCall(responseObserver, () -> protocol.getGroupInfoAsync(request),
ClientProtoUtils::toGroupInfoReplyProto);
}
+
+ @Override
+ public void setConfiguration(SetConfigurationRequestProto proto,
+ StreamObserver<RaftClientReplyProto> responseObserver) {
+ final SetConfigurationRequest request = ClientProtoUtils.toSetConfigurationRequest(proto);
+ GrpcUtil.asyncCall(responseObserver, () -> protocol.setConfigurationAsync(request),
+ ClientProtoUtils::toRaftClientReplyProto);
+ }
+
+ @Override
+ public void transferLeadership(TransferLeadershipRequestProto proto,
+ StreamObserver<RaftClientReplyProto> responseObserver) {
+ final TransferLeadershipRequest request = ClientProtoUtils.toTransferLeadershipRequest(proto);
+ GrpcUtil.asyncCall(responseObserver, () -> protocol.transferLeadershipAsync(request),
+ ClientProtoUtils::toRaftClientReplyProto);
+ }
}
diff --git a/ratis-proto/src/main/proto/Grpc.proto b/ratis-proto/src/main/proto/Grpc.proto
index 14b6067..61a8347 100644
--- a/ratis-proto/src/main/proto/Grpc.proto
+++ b/ratis-proto/src/main/proto/Grpc.proto
@@ -24,13 +24,6 @@ package ratis.grpc;
import "Raft.proto";
service RaftClientProtocolService {
- // A client-to-server RPC to set new raft configuration
- rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
- returns(ratis.common.RaftClientReplyProto) {}
-
- rpc transferLeadership(ratis.common.TransferLeadershipRequestProto)
- returns(ratis.common.RaftClientReplyProto) {}
-
// A client-to-server stream RPC to ordered async requests
rpc ordered(stream ratis.common.RaftClientRequestProto)
returns (stream ratis.common.RaftClientReplyProto) {}
@@ -55,6 +48,13 @@ service RaftServerProtocolService {
}
service AdminProtocolService {
+ // A client-to-server RPC to set new raft configuration
+ rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
+ returns(ratis.common.RaftClientReplyProto) {}
+
+ rpc transferLeadership(ratis.common.TransferLeadershipRequestProto)
+ returns(ratis.common.RaftClientReplyProto) {}
+
// A client-to-server RPC to add a new group
rpc groupManagement(ratis.common.GroupManagementRequestProto)
returns(ratis.common.RaftClientReplyProto) {}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index d7ef2c3..f932fbb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -91,7 +91,7 @@ import com.codahale.metrics.Timer;
class RaftServerImpl implements RaftServer.Division,
RaftServerProtocol, RaftServerAsynchronousProtocol,
- RaftClientProtocol, RaftClientAsynchronousProtocol {
+ RaftClientProtocol, RaftClientAsynchronousProtocol{
private static final String CLASS_NAME = JavaUtils.getClassSimpleName(RaftServerImpl.class);
static final String REQUEST_VOTE = CLASS_NAME + ".requestVote";
static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
@@ -881,7 +881,6 @@ class RaftServerImpl implements RaftServer.Division,
}
}
- @Override
public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
return waitForReply(request, transferLeadershipAsync(request));
}
@@ -901,7 +900,6 @@ class RaftServerImpl implements RaftServer.Division,
transferLeadership.finish(state.getLeaderId(), false);
}
- @Override
public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request)
throws IOException {
LOG.info("{}: receive transferLeadership {}", getMemberId(), request);
@@ -944,7 +942,6 @@ class RaftServerImpl implements RaftServer.Division,
}
}
- @Override
public RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException {
return waitForReply(request, setConfigurationAsync(request));
}
@@ -952,7 +949,6 @@ class RaftServerImpl implements RaftServer.Division,
/**
* Handle a raft configuration change request from client.
*/
- @Override
public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfigurationRequest request) throws IOException {
LOG.info("{}: receive setConfiguration {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);