You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/01/13 12:38:41 UTC

[incubator-ratis] branch master updated: RATIS-1286. Move setConfiguration and transferLeadership to admin GRPC calls. (#394)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 544590d  RATIS-1286. Move setConfiguration and transferLeadership to admin GRPC calls. (#394)
544590d is described below

commit 544590d51b9ce9d8814958f8852a3c0d9d1ca67a
Author: Elek, Márton <el...@users.noreply.github.com>
AuthorDate: Wed Jan 13 13:38:33 2021 +0100

    RATIS-1286. Move setConfiguration and transferLeadership to admin GRPC calls. (#394)
---
 .../ratis/protocol/AdminAsynchronousProtocol.java      |  7 +++++++
 .../java/org/apache/ratis/protocol/AdminProtocol.java  |  4 ++++
 .../ratis/protocol/RaftClientAsynchronousProtocol.java |  5 -----
 .../org/apache/ratis/protocol/RaftClientProtocol.java  |  4 ----
 .../ratis/grpc/client/GrpcClientProtocolClient.java    |  5 ++---
 .../ratis/grpc/client/GrpcClientProtocolService.java   | 18 ------------------
 .../ratis/grpc/server/GrpcAdminProtocolService.java    | 18 ++++++++++++++++++
 ratis-proto/src/main/proto/Grpc.proto                  | 14 +++++++-------
 .../org/apache/ratis/server/impl/RaftServerImpl.java   |  6 +-----
 9 files changed, 39 insertions(+), 42 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
index 3e69bb8..c9b2f7e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.protocol;
 
+import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
 /** Asynchronous version of {@link AdminProtocol}. */
@@ -26,4 +27,10 @@ public interface AdminAsynchronousProtocol {
   CompletableFuture<GroupInfoReply> getGroupInfoAsync(GroupInfoRequest request);
 
   CompletableFuture<RaftClientReply> groupManagementAsync(GroupManagementRequest request);
+
+  CompletableFuture<RaftClientReply> setConfigurationAsync(
+      SetConfigurationRequest request) throws IOException;
+
+  CompletableFuture<RaftClientReply> transferLeadershipAsync(
+      TransferLeadershipRequest request) throws IOException;
 }
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
index ed9eed7..b10642c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
@@ -26,4 +26,8 @@ public interface AdminProtocol {
   GroupInfoReply getGroupInfo(GroupInfoRequest request) throws IOException;
 
   RaftClientReply groupManagement(GroupManagementRequest request) throws IOException;
+
+  RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException;
+
+  RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException;
 }
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
index ad43286..1a9f83c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
@@ -25,9 +25,4 @@ public interface RaftClientAsynchronousProtocol {
   CompletableFuture<RaftClientReply> submitClientRequestAsync(
       RaftClientRequest request) throws IOException;
 
-  CompletableFuture<RaftClientReply> setConfigurationAsync(
-      SetConfigurationRequest request) throws IOException;
-
-  CompletableFuture<RaftClientReply> transferLeadershipAsync(
-      TransferLeadershipRequest request) throws IOException;
 }
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
index 1ac00f3..44178b5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
@@ -21,8 +21,4 @@ import java.io.IOException;
 
 public interface RaftClientProtocol {
   RaftClientReply submitClientRequest(RaftClientRequest request) throws IOException;
-
-  RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException;
-
-  RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException;
 }
\ No newline at end of file
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 1d366f9..61bee8c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -172,10 +172,9 @@ public class GrpcClientProtocolClient implements Closeable {
         .groupInfo(request);
   }
 
-
   RaftClientReplyProto setConfiguration(
       SetConfigurationRequestProto request) throws IOException {
-    return blockingCall(() -> blockingStub
+    return blockingCall(() -> adminBlockingStub
         .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
         .setConfiguration(request));
   }
@@ -184,7 +183,7 @@ public class GrpcClientProtocolClient implements Closeable {
       TransferLeadershipRequestProto request) throws IOException {
     TimeDuration newDuration = requestTimeoutDuration.add(
         request.getRpcRequest().getTimeoutMs(), TimeUnit.MILLISECONDS);
-    return blockingCall(() -> blockingStub
+    return blockingCall(() -> adminBlockingStub
         .withDeadlineAfter(newDuration.getDuration(), newDuration.getUnit())
         .transferLeadership(request));
   }
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
index 0d08661..7cd8c08 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -26,8 +26,6 @@ import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
-import org.apache.ratis.proto.RaftProtos.TransferLeadershipRequestProto;
 import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.JavaUtils;
@@ -146,22 +144,6 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
   }
 
   @Override
-  public void setConfiguration(SetConfigurationRequestProto proto,
-      StreamObserver<RaftClientReplyProto> responseObserver) {
-    final SetConfigurationRequest request = ClientProtoUtils.toSetConfigurationRequest(proto);
-    GrpcUtil.asyncCall(responseObserver, () -> protocol.setConfigurationAsync(request),
-        ClientProtoUtils::toRaftClientReplyProto);
-  }
-
-  @Override
-  public void transferLeadership(TransferLeadershipRequestProto proto,
-      StreamObserver<RaftClientReplyProto> responseObserver) {
-    final TransferLeadershipRequest request = ClientProtoUtils.toTransferLeadershipRequest(proto);
-    GrpcUtil.asyncCall(responseObserver, () -> protocol.transferLeadershipAsync(request),
-        ClientProtoUtils::toRaftClientReplyProto);
-  }
-
-  @Override
   public StreamObserver<RaftClientRequestProto> ordered(StreamObserver<RaftClientReplyProto> responseObserver) {
     final OrderedRequestStreamObserver so = new OrderedRequestStreamObserver(responseObserver);
     orderedStreamObservers.putNew(so);
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java
index 199e38f..1156aeb 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java
@@ -24,6 +24,8 @@ import org.apache.ratis.protocol.AdminAsynchronousProtocol;
 import org.apache.ratis.protocol.GroupInfoRequest;
 import org.apache.ratis.protocol.GroupListRequest;
 import org.apache.ratis.protocol.GroupManagementRequest;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
@@ -58,4 +60,20 @@ public class GrpcAdminProtocolService extends AdminProtocolServiceImplBase {
     GrpcUtil.asyncCall(responseObserver, () -> protocol.getGroupInfoAsync(request),
         ClientProtoUtils::toGroupInfoReplyProto);
   }
+
+  @Override
+  public void setConfiguration(SetConfigurationRequestProto proto,
+      StreamObserver<RaftClientReplyProto> responseObserver) {
+    final SetConfigurationRequest request = ClientProtoUtils.toSetConfigurationRequest(proto);
+    GrpcUtil.asyncCall(responseObserver, () -> protocol.setConfigurationAsync(request),
+        ClientProtoUtils::toRaftClientReplyProto);
+  }
+
+  @Override
+  public void transferLeadership(TransferLeadershipRequestProto proto,
+      StreamObserver<RaftClientReplyProto> responseObserver) {
+    final TransferLeadershipRequest request = ClientProtoUtils.toTransferLeadershipRequest(proto);
+    GrpcUtil.asyncCall(responseObserver, () -> protocol.transferLeadershipAsync(request),
+        ClientProtoUtils::toRaftClientReplyProto);
+  }
 }
diff --git a/ratis-proto/src/main/proto/Grpc.proto b/ratis-proto/src/main/proto/Grpc.proto
index 14b6067..61a8347 100644
--- a/ratis-proto/src/main/proto/Grpc.proto
+++ b/ratis-proto/src/main/proto/Grpc.proto
@@ -24,13 +24,6 @@ package ratis.grpc;
 import "Raft.proto";
 
 service RaftClientProtocolService {
-  // A client-to-server RPC to set new raft configuration
-  rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
-      returns(ratis.common.RaftClientReplyProto) {}
-
-  rpc transferLeadership(ratis.common.TransferLeadershipRequestProto)
-      returns(ratis.common.RaftClientReplyProto) {}
-      
   // A client-to-server stream RPC to ordered async requests
   rpc ordered(stream ratis.common.RaftClientRequestProto)
       returns (stream ratis.common.RaftClientReplyProto) {}
@@ -55,6 +48,13 @@ service RaftServerProtocolService {
 }
 
 service AdminProtocolService {
+  // A client-to-server RPC to set new raft configuration
+  rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
+      returns(ratis.common.RaftClientReplyProto) {}
+
+  rpc transferLeadership(ratis.common.TransferLeadershipRequestProto)
+      returns(ratis.common.RaftClientReplyProto) {}
+
   // A client-to-server RPC to add a new group
   rpc groupManagement(ratis.common.GroupManagementRequestProto)
       returns(ratis.common.RaftClientReplyProto) {}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index d7ef2c3..f932fbb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -91,7 +91,7 @@ import com.codahale.metrics.Timer;
 
 class RaftServerImpl implements RaftServer.Division,
     RaftServerProtocol, RaftServerAsynchronousProtocol,
-    RaftClientProtocol, RaftClientAsynchronousProtocol {
+    RaftClientProtocol, RaftClientAsynchronousProtocol{
   private static final String CLASS_NAME = JavaUtils.getClassSimpleName(RaftServerImpl.class);
   static final String REQUEST_VOTE = CLASS_NAME + ".requestVote";
   static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
@@ -881,7 +881,6 @@ class RaftServerImpl implements RaftServer.Division,
     }
   }
 
-  @Override
   public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
     return waitForReply(request, transferLeadershipAsync(request));
   }
@@ -901,7 +900,6 @@ class RaftServerImpl implements RaftServer.Division,
     transferLeadership.finish(state.getLeaderId(), false);
   }
 
-  @Override
   public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request)
       throws IOException {
     LOG.info("{}: receive transferLeadership {}", getMemberId(), request);
@@ -944,7 +942,6 @@ class RaftServerImpl implements RaftServer.Division,
     }
   }
 
-  @Override
   public RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException {
     return waitForReply(request, setConfigurationAsync(request));
   }
@@ -952,7 +949,6 @@ class RaftServerImpl implements RaftServer.Division,
   /**
    * Handle a raft configuration change request from client.
    */
-  @Override
   public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfigurationRequest request) throws IOException {
     LOG.info("{}: receive setConfiguration {}", getMemberId(), request);
     assertLifeCycleState(LifeCycle.States.RUNNING);