You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/09/10 23:16:16 UTC
incubator-ratis git commit: RATIS-305. Replace the reinitialize() API
with some group management APIs.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 89b1a1cd9 -> 09b099c71
RATIS-305. Replace the reinitialize() API with some group management APIs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/09b099c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/09b099c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/09b099c7
Branch: refs/heads/master
Commit: 09b099c71d4f112e4c7802cab1b48f2b84e0620b
Parents: 89b1a1c
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Mon Sep 10 16:15:53 2018 -0700
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Mon Sep 10 16:15:53 2018 -0700
----------------------------------------------------------------------
.../org/apache/ratis/client/RaftClient.java | 7 +-
.../ratis/client/impl/ClientProtoUtils.java | 41 ++--
.../ratis/client/impl/RaftClientImpl.java | 15 +-
.../protocol/AdminAsynchronousProtocol.java | 5 +-
.../apache/ratis/protocol/AdminProtocol.java | 4 +-
.../ratis/protocol/GroupManagementRequest.java | 92 ++++++++
.../ratis/protocol/RaftClientMessage.java | 4 +-
.../ratis/protocol/ReinitializeRequest.java | 37 ----
.../test/java/org/apache/ratis/BaseTest.java | 6 +-
.../org/apache/ratis/TestMultiRaftGroup.java | 4 +-
.../apache/ratis/grpc/client/GrpcClientRpc.java | 33 +--
.../grpc/client/RaftClientProtocolClient.java | 5 +-
.../ratis/grpc/server/AdminProtocolService.java | 11 +-
.../ratis/grpc/TestGroupManagementWithGrpc.java | 28 +++
.../grpc/TestReinitializationWithGrpc.java | 28 ---
...nedClientProtocolClientSideTranslatorPB.java | 8 +-
...nedClientProtocolServerSideTranslatorPB.java | 11 +-
.../ratis/hadooprpc/client/HadoopClientRpc.java | 4 +-
.../TestGroupManagementWithHadoopRpc.java | 28 +++
.../TestReinitializationWithHadoopRpc.java | 28 ---
.../ratis/netty/client/NettyClientRpc.java | 10 +-
.../ratis/netty/server/NettyRpcService.java | 8 +-
.../netty/TestGroupManagementWithNetty.java | 28 +++
.../netty/TestReinitializationWithNetty.java | 28 ---
ratis-proto-shaded/src/main/proto/GRpc.proto | 4 +-
ratis-proto-shaded/src/main/proto/Hadoop.proto | 2 +-
ratis-proto-shaded/src/main/proto/Netty.proto | 2 +-
ratis-proto-shaded/src/main/proto/Raft.proto | 18 +-
.../ratis/server/impl/RaftServerProxy.java | 65 ++++--
.../org/apache/ratis/RaftExceptionBaseTest.java | 4 +-
.../server/impl/GroupManagementBaseTest.java | 218 +++++++++++++++++++
.../server/impl/ReinitializationBaseTest.java | 218 -------------------
.../server/simulation/SimulatedServerRpc.java | 4 +-
.../TestGroupManagementWithSimulatedRpc.java | 28 +++
.../TestReinitializationWithSimulatedRpc.java | 28 ---
35 files changed, 592 insertions(+), 472 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 5562f59..ead9155 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -89,8 +89,11 @@ public interface RaftClient extends Closeable {
/** Send set configuration request to the raft service. */
RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException;
- /** Send reinitialize request to the given server (not the raft service). */
- RaftClientReply reinitialize(RaftGroup newGroup, RaftPeerId server) throws IOException;
+ /** Send groupAdd request to the given server (not the raft service). */
+ RaftClientReply groupAdd(RaftGroup newGroup, RaftPeerId server) throws IOException;
+
+ /** Send groupRemove request to the given server (not the raft service). */
+ RaftClientReply groupRemove(RaftGroupId groupId, RaftPeerId server) throws IOException;
/** Send serverInformation request to the given server.*/
RaftClientReply serverInformation(RaftPeerId server) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 1c41a3b..7065dd4 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -293,15 +293,20 @@ public interface ClientProtoUtils {
.build();
}
- static ReinitializeRequest toReinitializeRequest(
- ReinitializeRequestProto p) {
+ static GroupManagementRequest toGroupManagementRequest(GroupManagementRequestProto p) {
final RaftRpcRequestProto m = p.getRpcRequest();
- return new ReinitializeRequest(
- ClientId.valueOf(m.getRequestorId()),
- RaftPeerId.valueOf(m.getReplyId()),
- ProtoUtils.toRaftGroupId(m.getRaftGroupId()),
- m.getCallId(),
- ProtoUtils.toRaftGroup(p.getGroup()));
+ final ClientId clientId = ClientId.valueOf(m.getRequestorId());
+ final RaftPeerId serverId = RaftPeerId.valueOf(m.getReplyId());
+ switch(p.getOpCase()) {
+ case GROUPADD:
+ return GroupManagementRequest.newAdd(clientId, serverId, m.getCallId(),
+ ProtoUtils.toRaftGroup(p.getGroupAdd().getGroup()));
+ case GROUPREMOVE:
+ return GroupManagementRequest.newRemove(clientId, serverId, m.getCallId(),
+ ProtoUtils.toRaftGroupId(p.getGroupRemove().getGroupId()));
+ default:
+ throw new IllegalArgumentException("Unexpected op " + p.getOpCase() + " in " + p);
+ }
}
static ServerInformationRequest toServerInformationRequest(
@@ -314,12 +319,20 @@ public interface ClientProtoUtils {
m.getCallId());
}
- static ReinitializeRequestProto toReinitializeRequestProto(
- ReinitializeRequest request) {
- return ReinitializeRequestProto.newBuilder()
- .setRpcRequest(toRaftRpcRequestProtoBuilder(request))
- .setGroup(ProtoUtils.toRaftGroupProtoBuilder(request.getGroup()))
- .build();
+ static GroupManagementRequestProto toGroupManagementRequestProto(GroupManagementRequest request) {
+ final GroupManagementRequestProto.Builder b = GroupManagementRequestProto.newBuilder()
+ .setRpcRequest(toRaftRpcRequestProtoBuilder(request));
+ final GroupManagementRequest.Add add = request.getAdd();
+ if (add != null) {
+ b.setGroupAdd(GroupAddRequestProto.newBuilder().setGroup(
+ ProtoUtils.toRaftGroupProtoBuilder(add.getGroup())).build());
+ }
+ final GroupManagementRequest.Remove remove = request.getRemove();
+ if (remove != null) {
+ b.setGroupRemove(GroupRemoveRequestProto.newBuilder().setGroupId(
+ ProtoUtils.toRaftGroupIdProtoBuilder(remove.getGroupId())).build());
+ }
+ return b.build();
}
static ServerInformationRequestProto toServerInformationRequestProto(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index eb78463..9419c7f 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -206,15 +206,22 @@ final class RaftClientImpl implements RaftClient {
}
@Override
- public RaftClientReply reinitialize(RaftGroup newGroup, RaftPeerId server)
- throws IOException {
+ public RaftClientReply groupAdd(RaftGroup newGroup, RaftPeerId server) throws IOException {
Objects.requireNonNull(newGroup, "newGroup == null");
Objects.requireNonNull(server, "server == null");
final long callId = nextCallId();
addServers(newGroup.getPeers().stream());
- return sendRequest(new ReinitializeRequest(
- clientId, server, groupId, callId, newGroup));
+ return sendRequest(GroupManagementRequest.newAdd(clientId, server, callId, newGroup));
+ }
+
+ @Override
+ public RaftClientReply groupRemove(RaftGroupId groupId, RaftPeerId server) throws IOException {
+ Objects.requireNonNull(groupId, "groupId == null");
+ Objects.requireNonNull(server, "server == null");
+
+ final long callId = nextCallId();
+ return sendRequest(GroupManagementRequest.newRemove(clientId, server, callId, groupId));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
index 4907386..6c545a8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
@@ -22,9 +22,8 @@ import java.util.concurrent.CompletableFuture;
/** Asynchronous version of {@link AdminProtocol}. */
public interface AdminAsynchronousProtocol {
- CompletableFuture<RaftClientReply> reinitializeAsync(
- ReinitializeRequest request) throws IOException;
-
CompletableFuture<ServerInformationReply> getInfoAsync(
ServerInformationRequest request) throws IOException;
+
+ CompletableFuture<RaftClientReply> groupManagementAsync(GroupManagementRequest request);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
index a480953..03bb266 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
@@ -21,7 +21,7 @@ import java.io.IOException;
/** For server administration. */
public interface AdminProtocol {
- RaftClientReply reinitialize(ReinitializeRequest request) throws IOException;
-
ServerInformationReply getInfo(ServerInformationRequest request) throws IOException;
+
+ RaftClientReply groupManagement(GroupManagementRequest request) throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
new file mode 100644
index 0000000..8577548
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+public class GroupManagementRequest extends RaftClientRequest {
+ public static abstract class Op {
+ public abstract RaftGroupId getGroupId();
+ }
+
+ public static class Add extends Op {
+ private final RaftGroup group;
+
+ public Add(RaftGroup group) {
+ this.group = group;
+ }
+
+ @Override
+ public RaftGroupId getGroupId() {
+ return getGroup().getGroupId();
+ }
+
+ public RaftGroup getGroup() {
+ return group;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":" + getGroup();
+ }
+ }
+
+ public static class Remove extends Op {
+ private final RaftGroupId groupId;
+
+ public Remove(RaftGroupId groupId) {
+ this.groupId = groupId;
+ }
+
+ @Override
+ public RaftGroupId getGroupId() {
+ return groupId;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":" + getGroupId();
+ }
+ }
+
+ public static GroupManagementRequest newAdd(ClientId clientId, RaftPeerId serverId, long callId, RaftGroup group) {
+ return new GroupManagementRequest(clientId, serverId, callId, new Add(group));
+ }
+
+ public static GroupManagementRequest newRemove(ClientId clientId, RaftPeerId serverId, long callId, RaftGroupId groupId) {
+ return new GroupManagementRequest(clientId, serverId, callId, new Remove(groupId));
+ }
+
+ private final Op op;
+
+ private GroupManagementRequest(ClientId clientId, RaftPeerId serverId, long callId, Op op) {
+ super(clientId, serverId, op.getGroupId(), callId);
+ this.op = op;
+ }
+
+ public Add getAdd() {
+ return op instanceof Add? (Add)op: null;
+ }
+
+ public Remove getRemove() {
+ return op instanceof Remove? (Remove)op: null;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + ", " + op;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
index 07354d4..8a2e4a9 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
@@ -54,7 +54,7 @@ public abstract class RaftClientMessage implements RaftRpcMessage {
@Override
public String toString() {
- return getClass().getSimpleName() + "(" + clientId + "->" + serverId
- + ") in " + groupId;
+ return getClass().getSimpleName() + ":" + clientId + "->" + serverId
+ + (groupId != null? "@" + groupId: "");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
deleted file mode 100644
index b0e69af..0000000
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.protocol;
-
-public class ReinitializeRequest extends RaftClientRequest {
- private final RaftGroup group;
-
- public ReinitializeRequest(ClientId clientId, RaftPeerId serverId,
- RaftGroupId groupId, long callId, RaftGroup group) {
- super(clientId, serverId, groupId, callId);
- this.group = group;
- }
-
- public RaftGroup getGroup() {
- return group;
- }
-
- @Override
- public String toString() {
- return super.toString() + ", " + getGroup();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index e487841..7efd9a7 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -98,12 +98,16 @@ public abstract class BaseTest {
String description, CheckedRunnable<?> testCode,
Class<? extends Throwable> exceptedThrowableClass, Logger log,
Class<? extends Throwable>... exceptedCauseClasses) {
+ boolean caught = false;
try {
testCode.run();
- Assert.fail("The test \"" + description + "\" does not throw anything.");
} catch (Throwable t) {
+ caught = true;
assertThrowable(description, t, exceptedThrowableClass, log, exceptedCauseClasses);
}
+ if (!caught) {
+ Assert.fail("The test \"" + description + "\" does not throw anything.");
+ }
}
public void testFailureCase(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java
index b3b0998..030badd 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java
@@ -25,7 +25,7 @@ import org.apache.ratis.examples.arithmetic.ArithmeticStateMachine;
import org.apache.ratis.examples.arithmetic.TestArithmetic;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.ReinitializationBaseTest;
+import org.apache.ratis.server.impl.GroupManagementBaseTest;
import org.apache.ratis.util.CheckedBiConsumer;
import org.apache.ratis.util.LogUtils;
import org.junit.Test;
@@ -70,7 +70,7 @@ public class TestMultiRaftGroup extends BaseTest {
}
};
- ReinitializationBaseTest.runTestReinitializeMultiGroups(
+ GroupManagementBaseTest.runMultiGroupTest(
cluster, idIndex, chosen, checker);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 8c26c7f..160ae16 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -22,11 +22,18 @@ import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.*;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupManagementRequest;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.ServerInformationRequest;
+import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos;
+import org.apache.ratis.shaded.proto.RaftProtos.GroupManagementRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
@@ -39,8 +46,6 @@ import java.io.InterruptedIOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import static org.apache.ratis.client.impl.ClientProtoUtils.*;
-
public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClient> {
public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class);
@@ -71,19 +76,17 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
throws IOException {
final RaftPeerId serverId = request.getServerId();
final RaftClientProtocolClient proxy = getProxies().getProxy(serverId);
- if (request instanceof ReinitializeRequest) {
- RaftProtos.ReinitializeRequestProto proto =
- toReinitializeRequestProto((ReinitializeRequest) request);
- return toRaftClientReply(proxy.reinitialize(proto));
+ if (request instanceof GroupManagementRequest) {
+ final GroupManagementRequestProto proto = ClientProtoUtils.toGroupManagementRequestProto((GroupManagementRequest)request);
+ return ClientProtoUtils.toRaftClientReply(proxy.groupAdd(proto));
} else if (request instanceof SetConfigurationRequest) {
- SetConfigurationRequestProto setConf =
- toSetConfigurationRequestProto((SetConfigurationRequest) request);
- return toRaftClientReply(proxy.setConfiguration(setConf));
+ final SetConfigurationRequestProto setConf = ClientProtoUtils.toSetConfigurationRequestProto(
+ (SetConfigurationRequest) request);
+ return ClientProtoUtils.toRaftClientReply(proxy.setConfiguration(setConf));
} else if (request instanceof ServerInformationRequest){
- RaftProtos.ServerInformationRequestProto proto =
- toServerInformationRequestProto((ServerInformationRequest) request);
- return ClientProtoUtils.toServerInformationReply(
- proxy.serverInformation(proto));
+ final ServerInformationRequestProto proto = ClientProtoUtils.toServerInformationRequestProto(
+ (ServerInformationRequest) request);
+ return ClientProtoUtils.toServerInformationReply(proxy.serverInformation(proto));
} else {
final CompletableFuture<RaftClientReply> f = sendRequest(request, proxy);
// TODO: timeout support
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
index 2d095ab..11f2676 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
@@ -97,11 +97,10 @@ public class RaftClientProtocolClient implements Closeable {
channel.shutdownNow();
}
- RaftClientReplyProto reinitialize(
- ReinitializeRequestProto request) throws IOException {
+ RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException {
return blockingCall(() -> adminBlockingStub
.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
- .reinitialize(request));
+ .groupManagement(request));
}
ServerInformationReplyProto serverInformation(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
index c7ba297..d65abd0 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
@@ -20,11 +20,11 @@ package org.apache.ratis.grpc.server;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.grpc.RaftGrpcUtil;
import org.apache.ratis.protocol.AdminAsynchronousProtocol;
-import org.apache.ratis.protocol.ReinitializeRequest;
+import org.apache.ratis.protocol.GroupManagementRequest;
import org.apache.ratis.protocol.ServerInformationRequest;
import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.GroupManagementRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationRequestProto;
import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceImplBase;
@@ -37,10 +37,9 @@ public class AdminProtocolService extends AdminProtocolServiceImplBase {
}
@Override
- public void reinitialize(ReinitializeRequestProto proto,
- StreamObserver<RaftClientReplyProto> responseObserver) {
- final ReinitializeRequest request = ClientProtoUtils.toReinitializeRequest(proto);
- RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.reinitializeAsync(request),
+ public void groupManagement(GroupManagementRequestProto proto, StreamObserver<RaftClientReplyProto> responseObserver) {
+ final GroupManagementRequest request = ClientProtoUtils.toGroupManagementRequest(proto);
+ RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.groupManagementAsync(request),
ClientProtoUtils::toRaftClientReplyProto);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java
new file mode 100644
index 0000000..0b5e2a9
--- /dev/null
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.server.impl.GroupManagementBaseTest;
+
+public class TestGroupManagementWithGrpc extends GroupManagementBaseTest {
+ @Override
+ public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() {
+ return MiniRaftClusterWithGRpc.FACTORY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java
deleted file mode 100644
index 27cbf1e..0000000
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc;
-
-import org.apache.ratis.MiniRaftCluster;
-import org.apache.ratis.server.impl.ReinitializationBaseTest;
-
-public class TestReinitializationWithGrpc extends ReinitializationBaseTest {
- @Override
- public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() {
- return MiniRaftClusterWithGRpc.FACTORY;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
index c47a69b..43fcca1 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
@@ -23,7 +23,7 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.hadooprpc.Proxy;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.ReinitializeRequest;
+import org.apache.ratis.protocol.GroupManagementRequest;
import org.apache.ratis.protocol.ServerInformationRequest;
import org.apache.ratis.protocol.ServerInformationReply;
import org.apache.ratis.protocol.SetConfigurationRequest;
@@ -67,11 +67,11 @@ public class CombinedClientProtocolClientSideTranslatorPB
}
@Override
- public RaftClientReply reinitialize(ReinitializeRequest request) throws IOException {
+ public RaftClientReply groupManagement(GroupManagementRequest request) throws IOException {
return handleRequest(request,
- ClientProtoUtils::toReinitializeRequestProto,
+ ClientProtoUtils::toGroupManagementRequestProto,
ClientProtoUtils::toRaftClientReply,
- p -> getProtocol().reinitialize(null, p));
+ p -> getProtocol().groupManagement(null, p));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java
index 5b88208..c47e5b0 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java
@@ -28,7 +28,7 @@ import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.GroupManagementRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationReplyProto;
@@ -69,13 +69,12 @@ public class CombinedClientProtocolServerSideTranslatorPB
}
@Override
- public RaftClientReplyProto reinitialize(
- RpcController controller, ReinitializeRequestProto proto)
+ public RaftClientReplyProto groupManagement(RpcController controller, GroupManagementRequestProto proto)
throws ServiceException {
- final ReinitializeRequest request;
+ final GroupManagementRequest request;
try {
- request = ClientProtoUtils.toReinitializeRequest(proto);
- final RaftClientReply reply = impl.reinitialize(request);
+ request = ClientProtoUtils.toGroupManagementRequest(proto);
+ final RaftClientReply reply = impl.groupManagement(request);
return ClientProtoUtils.toRaftClientReplyProto(reply);
} catch(IOException ioe) {
throw new ServiceException(ioe);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
index e35875c..38b401e 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
@@ -38,8 +38,8 @@ public class HadoopClientRpc extends RaftClientRpcWithProxy<CombinedClientProtoc
final CombinedClientProtocolClientSideTranslatorPB proxy =
getProxies().getProxy(serverId);
try {
- if (request instanceof ReinitializeRequest) {
- return proxy.reinitialize((ReinitializeRequest) request);
+ if (request instanceof GroupManagementRequest) {
+ return proxy.groupManagement((GroupManagementRequest) request);
} else if (request instanceof SetConfigurationRequest) {
return proxy.setConfiguration((SetConfigurationRequest) request);
} else if (request instanceof ServerInformationRequest) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestGroupManagementWithHadoopRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestGroupManagementWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestGroupManagementWithHadoopRpc.java
new file mode 100644
index 0000000..af0074f
--- /dev/null
+++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestGroupManagementWithHadoopRpc.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.server.impl.GroupManagementBaseTest;
+
+public class TestGroupManagementWithHadoopRpc extends GroupManagementBaseTest {
+ @Override
+ public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() {
+ return MiniRaftClusterWithHadoopRpc.FACTORY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java
deleted file mode 100644
index 6efb012..0000000
--- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.hadooprpc;
-
-import org.apache.ratis.MiniRaftCluster;
-import org.apache.ratis.server.impl.ReinitializationBaseTest;
-
-public class TestReinitializationWithHadoopRpc extends ReinitializationBaseTest {
- @Override
- public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() {
- return MiniRaftClusterWithHadoopRpc.FACTORY;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
index f4c1f11..3bc2608 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
@@ -24,7 +24,7 @@ import org.apache.ratis.protocol.*;
import org.apache.ratis.shaded.proto.RaftProtos;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.GroupManagementRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto;
@@ -42,10 +42,10 @@ public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
final RaftNettyServerRequestProto.Builder b = RaftNettyServerRequestProto.newBuilder();
final RaftRpcRequestProto rpcRequest;
- if (request instanceof ReinitializeRequest) {
- final ReinitializeRequestProto proto = ClientProtoUtils.toReinitializeRequestProto(
- (ReinitializeRequest)request);
- b.setReinitializeRequest(proto);
+ if (request instanceof GroupManagementRequest) {
+ final GroupManagementRequestProto proto = ClientProtoUtils.toGroupManagementRequestProto(
+ (GroupManagementRequest)request);
+ b.setGroupManagementRequest(proto);
rpcRequest = proto.getRpcRequest();
} else if (request instanceof SetConfigurationRequest) {
final SetConfigurationRequestProto proto = ClientProtoUtils.toSetConfigurationRequestProto(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 45c0b77..7dba943 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -193,11 +193,11 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy,
.setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply))
.build();
}
- case REINITIALIZEREQUEST: {
- final ReinitializeRequestProto request = proto.getReinitializeRequest();
+ case GROUPMANAGEMENTREQUEST: {
+ final GroupManagementRequestProto request = proto.getGroupManagementRequest();
rpcRequest = request.getRpcRequest();
- final RaftClientReply reply = server.reinitialize(
- ClientProtoUtils.toReinitializeRequest(request));
+ final RaftClientReply reply = server.groupManagement(
+ ClientProtoUtils.toGroupManagementRequest(request));
return RaftNettyServerReplyProto.newBuilder()
.setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply))
.build();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-netty/src/test/java/org/apache/ratis/netty/TestGroupManagementWithNetty.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestGroupManagementWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestGroupManagementWithNetty.java
new file mode 100644
index 0000000..e049e32
--- /dev/null
+++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestGroupManagementWithNetty.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.server.impl.GroupManagementBaseTest;
+
+public class TestGroupManagementWithNetty extends GroupManagementBaseTest {
+ @Override
+ public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() {
+ return MiniRaftClusterWithNetty.FACTORY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java
deleted file mode 100644
index c378749..0000000
--- a/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.netty;
-
-import org.apache.ratis.MiniRaftCluster;
-import org.apache.ratis.server.impl.ReinitializationBaseTest;
-
-public class TestReinitializationWithNetty extends ReinitializationBaseTest {
- @Override
- public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() {
- return MiniRaftClusterWithNetty.FACTORY;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-proto-shaded/src/main/proto/GRpc.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/GRpc.proto b/ratis-proto-shaded/src/main/proto/GRpc.proto
index 375079f..d7e550e 100644
--- a/ratis-proto-shaded/src/main/proto/GRpc.proto
+++ b/ratis-proto-shaded/src/main/proto/GRpc.proto
@@ -45,8 +45,8 @@ service RaftServerProtocolService {
}
service AdminProtocolService {
- // A client-to-server RPC to reinitialize the server
- rpc reinitialize(ratis.common.ReinitializeRequestProto)
+ // A client-to-server RPC to add a new group
+ rpc groupManagement(ratis.common.GroupManagementRequestProto)
returns(ratis.common.RaftClientReplyProto) {}
rpc serverInformation(ratis.common.ServerInformationRequestProto)
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-proto-shaded/src/main/proto/Hadoop.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Hadoop.proto b/ratis-proto-shaded/src/main/proto/Hadoop.proto
index 872c455..0d6107e 100644
--- a/ratis-proto-shaded/src/main/proto/Hadoop.proto
+++ b/ratis-proto-shaded/src/main/proto/Hadoop.proto
@@ -31,7 +31,7 @@ service CombinedClientProtocolService {
rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
returns(ratis.common.RaftClientReplyProto);
- rpc reinitialize(ratis.common.ReinitializeRequestProto)
+ rpc groupManagement(ratis.common.GroupManagementRequestProto)
returns(ratis.common.RaftClientReplyProto);
rpc serverInformation(ratis.common.ServerInformationRequestProto)
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-proto-shaded/src/main/proto/Netty.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Netty.proto b/ratis-proto-shaded/src/main/proto/Netty.proto
index c034dd5..40aa498 100644
--- a/ratis-proto-shaded/src/main/proto/Netty.proto
+++ b/ratis-proto-shaded/src/main/proto/Netty.proto
@@ -35,7 +35,7 @@ message RaftNettyServerRequestProto {
ratis.common.InstallSnapshotRequestProto installSnapshotRequest = 3;
ratis.common.RaftClientRequestProto raftClientRequest = 4;
ratis.common.SetConfigurationRequestProto setConfigurationRequest = 5;
- ratis.common.ReinitializeRequestProto reinitializeRequest = 6;
+ ratis.common.GroupManagementRequestProto groupManagementRequest = 6;
ratis.common.ServerInformationRequestProto serverInformationRequest = 7;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index 039a3f6..3d7eab3 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -244,10 +244,22 @@ message SetConfigurationRequestProto {
repeated RaftPeerProto peers = 2;
}
-// reinitialize request
-message ReinitializeRequestProto {
+// A request to add a new group
+message GroupAddRequestProto {
+ RaftGroupProto group = 1; // the group to be added.
+}
+
+message GroupRemoveRequestProto {
+ RaftGroupIdProto groupId = 1; // the group to be removed.
+}
+
+message GroupManagementRequestProto {
RaftRpcRequestProto rpcRequest = 1;
- RaftGroupProto group = 2; // the target group.
+
+ oneof Op {
+ GroupAddRequestProto groupAdd = 2;
+ GroupRemoveRequestProto groupRemove = 3;
+ }
}
// server info request
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 27ec67f..acec50a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -39,6 +39,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -47,7 +48,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public class RaftServerProxy implements RaftServer {
@@ -148,7 +148,6 @@ public class RaftServerProxy implements RaftServer {
private final ServerFactory factory;
private final ImplMap impls = new ImplMap();
- private final AtomicReference<ReinitializeRequest> reinitializeRequest = new AtomicReference<>();
RaftServerProxy(RaftPeerId id, StateMachine.Registry stateMachineRegistry,
RaftProperties properties, Parameters parameters) {
@@ -296,24 +295,37 @@ public class RaftServerProxy implements RaftServer {
}
@Override
- public RaftClientReply reinitialize(ReinitializeRequest request) throws IOException {
- return RaftServerImpl.waitForReply(getId(), request, reinitializeAsync(request),
+ public RaftClientReply groupManagement(GroupManagementRequest request) throws IOException {
+ return RaftServerImpl.waitForReply(getId(), request, groupManagementAsync(request),
e -> new RaftClientReply(request, e, null));
}
@Override
- public CompletableFuture<RaftClientReply> reinitializeAsync(
- ReinitializeRequest request) throws IOException {
- LOG.info("{}: reinitialize* {}", getId(), request);
- if (!reinitializeRequest.compareAndSet(null, request)) {
- throw new IOException("Another reinitialize is already in progress.");
+ public CompletableFuture<RaftClientReply> groupManagementAsync(GroupManagementRequest request) {
+ final RaftGroupId groupId = request.getRaftGroupId();
+ if (groupId == null) {
+ return JavaUtils.completeExceptionally(new GroupMismatchException(
+ getId() + ": Request group id == null"));
}
- final RaftGroupId oldGroupId = request.getRaftGroupId();
- return getImplFuture(oldGroupId)
- .thenAcceptAsync(RaftServerImpl::shutdown)
- .thenAccept(_1 -> impls.remove(oldGroupId))
- .thenCompose(_1 -> impls.addNew(request.getGroup()))
- .thenApply(newImpl -> {
+ final GroupManagementRequest.Add add = request.getAdd();
+ if (add != null) {
+ return groupdAddAsync(request, add.getGroup());
+ }
+ final GroupManagementRequest.Remove remove = request.getRemove();
+ if (remove != null) {
+ return groupRemoveAsync(request, remove.getGroupId());
+ }
+ return JavaUtils.completeExceptionally(new UnsupportedOperationException(
+ getId() + ": Request not supported " + request));
+ }
+
+ private CompletableFuture<RaftClientReply> groupdAddAsync(GroupManagementRequest request, RaftGroup newGroup) {
+ if (!request.getRaftGroupId().equals(newGroup.getGroupId())) {
+ return JavaUtils.completeExceptionally(new GroupMismatchException(
+ getId() + ": Request group id (" + request.getRaftGroupId() + ") does not match the new group " + newGroup));
+ }
+ return impls.addNew(newGroup)
+ .thenApplyAsync(newImpl -> {
LOG.debug("{}: newImpl = {}", getId(), newImpl);
final boolean started = newImpl.start();
Preconditions.assertTrue(started, () -> getId()+ ": failed to start a new impl: " + newImpl);
@@ -321,14 +333,29 @@ public class RaftServerProxy implements RaftServer {
})
.whenComplete((_1, throwable) -> {
if (throwable != null) {
- impls.remove(request.getGroup().getGroupId());
- LOG.warn(getId() + ": Failed reinitialize* " + request, throwable);
+ impls.remove(newGroup.getGroupId());
+ LOG.warn(getId() + ": Failed groupAdd* " + request, throwable);
}
-
- reinitializeRequest.set(null);
});
}
+ private CompletableFuture<RaftClientReply> groupRemoveAsync(RaftClientRequest request, RaftGroupId groupId) {
+ if (!request.getRaftGroupId().equals(groupId)) {
+ return JavaUtils.completeExceptionally(new GroupMismatchException(
+ getId() + ": Request group id (" + request.getRaftGroupId() + ") does not match the given group id " + groupId));
+ }
+ final CompletableFuture<RaftServerImpl> f = impls.remove(groupId);
+ if (f == null) {
+ return JavaUtils.completeExceptionally(new GroupMismatchException(
+ getId() + ": Group " + groupId + " not found."));
+ }
+ return f.thenApply(impl -> {
+ final Collection<CommitInfoProto> commitInfos = impl.getCommitInfos();
+ impl.shutdown();
+ return new RaftClientReply(request, commitInfos);
+ });
+ }
+
@Override
public ServerInformationReply getInfo(ServerInformationRequest request)
throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index a38488a..30d9d10 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -191,8 +191,8 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
() -> client.setConfiguration(RaftPeer.emptyArray()),
GroupMismatchException.class);
- testFailureCase("reinitialize(..) with client group being different from the server group",
- () -> client.reinitialize(anotherGroup, clusterGroup.getPeers().iterator().next().getId()),
+ testFailureCase("groupRemove(..) with another group id",
+ () -> client.groupRemove(anotherGroup.getGroupId(), clusterGroup.getPeers().iterator().next().getId()),
GroupMismatchException.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
new file mode 100644
index 0000000..5310b94
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.CheckedBiConsumer;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LogUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public abstract class GroupManagementBaseTest extends BaseTest {
+ static final Logger LOG = LoggerFactory.getLogger(GroupManagementBaseTest.class);
+
+ {
+ LogUtils.setLogLevel(RaftServerProxy.LOG, Level.DEBUG);
+ LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+ LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+ }
+
+ static final RaftProperties prop = new RaftProperties();
+
+ public abstract MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory();
+
+ public MiniRaftCluster getCluster(int peerNum) throws IOException {
+ return getClusterFactory().newCluster(peerNum, prop);
+ }
+
+ @Test
+ public void testMultiGroup() throws Exception {
+ final MiniRaftCluster cluster = getCluster(0);
+ LOG.info("Start testMultiGroup" + cluster.printServers());
+
+ // Start server with an empty conf
+ final RaftGroupId groupId = cluster.getGroupId();
+ final RaftGroup group = new RaftGroup(groupId);
+
+ final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(3, 0))
+ .map(RaftPeerId::valueOf).collect(Collectors.toList());
+ ids.forEach(id -> cluster.putNewServer(id, group, true));
+ LOG.info("putNewServer: " + cluster.printServers());
+
+ cluster.start();
+
+ // Make sure that there are no leaders.
+ TimeUnit.SECONDS.sleep(1);
+ LOG.info("start: " + cluster.printServers());
+ Assert.assertNull(cluster.getLeader());
+
+ // Add groups
+ final RaftGroup newGroup = new RaftGroup(RaftGroupId.randomId(), cluster.getPeers());
+ LOG.info("add new group: " + newGroup);
+ final RaftClient client = cluster.createClient(newGroup);
+ for(RaftPeer p : newGroup.getPeers()) {
+ client.groupAdd(newGroup, p.getId());
+ }
+ Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true));
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testMultiGroup5Nodes() throws Exception {
+ final int[] idIndex = {3, 4, 5};
+ runMultiGroupTest(idIndex, 0);
+ }
+
+ @Test
+ public void testMultiGroup7Nodes() throws Exception {
+ final int[] idIndex = {1, 6, 7};
+ runMultiGroupTest(idIndex, 1);
+ }
+
+ @Test
+ public void testMultiGroup9Nodes() throws Exception {
+ final int[] idIndex = {5, 8, 9};
+ runMultiGroupTest(idIndex, 2);
+ }
+
+ private void runMultiGroupTest(int[] idIndex, int chosen) throws Exception {
+ printThreadCount(null, "init");
+ runMultiGroupTest(getCluster(0), idIndex, chosen, NOOP);
+ }
+
+ static final CheckedBiConsumer<MiniRaftCluster, RaftGroup, RuntimeException> NOOP = (c, g) -> {};
+
+ public static <T extends Throwable> void runMultiGroupTest(
+ MiniRaftCluster cluster, int[] idIndex, int chosen,
+ CheckedBiConsumer<MiniRaftCluster, RaftGroup, T> checker)
+ throws IOException, InterruptedException, T {
+ if (chosen < 0) {
+ chosen = ThreadLocalRandom.current().nextInt(idIndex.length);
+ }
+ final String type = cluster.getClass().getSimpleName()
+ + Arrays.toString(idIndex) + "chosen=" + chosen;
+ LOG.info("\n\nrunMultiGroupTest with " + type + ": " + cluster.printServers());
+
+ // Start server with an empty conf
+ final RaftGroup emptyGroup = new RaftGroup(cluster.getGroupId());
+
+ final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(idIndex[idIndex.length - 1], 0))
+ .map(RaftPeerId::valueOf).collect(Collectors.toList());
+ LOG.info("ids: " + ids);
+ ids.forEach(id -> cluster.putNewServer(id, emptyGroup, true));
+ LOG.info("putNewServer: " + cluster.printServers());
+
+ TimeUnit.SECONDS.sleep(1);
+ cluster.start();
+
+ // Make sure that there are no leaders.
+ TimeUnit.SECONDS.sleep(1);
+ LOG.info("start: " + cluster.printServers());
+ Assert.assertNull(cluster.getLeader());
+
+ // Reinitialize servers to three groups
+ final List<RaftPeer> allPeers = cluster.getPeers();
+ Collections.sort(allPeers, Comparator.comparing(p -> p.getId().toString()));
+ final RaftGroup[] groups = new RaftGroup[idIndex.length];
+ for (int i = 0; i < idIndex.length; i++) {
+ final RaftGroupId gid = RaftGroupId.randomId();
+ final int previous = i == 0 ? 0 : idIndex[i - 1];
+ final RaftPeer[] peers = allPeers.subList(previous, idIndex[i]).toArray(RaftPeer.emptyArray());
+ groups[i] = new RaftGroup(gid, peers);
+
+ LOG.info(i + ") starting " + groups[i]);
+ for(RaftPeer p : peers) {
+ try(final RaftClient client = cluster.createClient(p.getId(), emptyGroup)) {
+ client.groupAdd(groups[i], p.getId());
+ }
+ }
+ Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true, gid));
+ checker.accept(cluster, groups[i]);
+ }
+ printThreadCount(type, "start groups");
+ LOG.info("start groups: " + cluster.printServers());
+
+ // randomly remove two of the groups
+ LOG.info("chosen = " + chosen + ", " + groups[chosen]);
+
+ for (int i = 0; i < groups.length; i++) {
+ if (i != chosen) {
+ final RaftGroup g = groups[i];
+ LOG.info(i + ") close " + cluster.printServers(g.getGroupId()));
+ for(RaftPeer p : g.getPeers()) {
+ try (final RaftClient client = cluster.createClient(p.getId(), g)) {
+ client.groupRemove(g.getGroupId(), p.getId());
+ }
+ }
+ }
+ }
+ printThreadCount(type, "close groups");
+ LOG.info("close groups: " + cluster.printServers());
+
+ // update chosen group to use all the peers
+ final RaftGroup newGroup = new RaftGroup(groups[chosen].getGroupId());
+ for(int i = 0; i < groups.length; i++) {
+ if (i != chosen) {
+ LOG.info(i + ") groupAdd: " + cluster.printServers(groups[i].getGroupId()));
+ for (RaftPeer p : groups[i].getPeers()) {
+ try (final RaftClient client = cluster.createClient(p.getId(), groups[i])) {
+ client.groupAdd(newGroup, p.getId());
+ }
+ }
+ }
+ }
+ LOG.info(chosen + ") setConfiguration: " + cluster.printServers(groups[chosen].getGroupId()));
+ try (final RaftClient client = cluster.createClient(groups[chosen])) {
+ client.setConfiguration(allPeers.toArray(RaftPeer.emptyArray()));
+ }
+
+ Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true));
+ checker.accept(cluster, groups[chosen]);
+ LOG.info("update groups: " + cluster.printServers());
+ printThreadCount(type, "update groups");
+
+ cluster.shutdown();
+ printThreadCount(type, "shutdown");
+ }
+
+ static void printThreadCount(String type, String label) {
+ System.out.println("| " + type + " | " + label + " | "
+ + JavaUtils.getRootThreadGroup().activeCount() + " |");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
deleted file mode 100644
index f6e417c..0000000
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.server.impl;
-
-import org.apache.log4j.Level;
-import org.apache.ratis.BaseTest;
-import org.apache.ratis.MiniRaftCluster;
-import org.apache.ratis.RaftTestUtil;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.util.CheckedBiConsumer;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.LogUtils;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-public abstract class ReinitializationBaseTest extends BaseTest {
- static final Logger LOG = LoggerFactory.getLogger(ReinitializationBaseTest.class);
-
- {
- LogUtils.setLogLevel(RaftServerProxy.LOG, Level.DEBUG);
- LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
- LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
- }
-
- static final RaftProperties prop = new RaftProperties();
-
- public abstract MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory();
-
- public MiniRaftCluster getCluster(int peerNum) throws IOException {
- return getClusterFactory().newCluster(peerNum, prop);
- }
-
- @Test
- public void testReinitialize() throws Exception {
- final MiniRaftCluster cluster = getCluster(0);
- LOG.info("Start testReinitialize" + cluster.printServers());
-
- // Start server with an empty conf
- final RaftGroupId groupId = cluster.getGroupId();
- final RaftGroup group = new RaftGroup(groupId);
-
- final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(3, 0))
- .map(RaftPeerId::valueOf).collect(Collectors.toList());
- ids.forEach(id -> cluster.putNewServer(id, group, true));
- LOG.info("putNewServer: " + cluster.printServers());
-
- cluster.start();
-
- // Make sure that there are no leaders.
- TimeUnit.SECONDS.sleep(1);
- LOG.info("start: " + cluster.printServers());
- Assert.assertNull(cluster.getLeader());
-
- // Reinitialize servers
- final RaftGroup newGroup = new RaftGroup(groupId, cluster.getPeers());
- final RaftClient client = cluster.createClient(newGroup);
- for(RaftPeer p : newGroup.getPeers()) {
- client.reinitialize(newGroup, p.getId());
- }
- Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true));
- cluster.shutdown();
- }
-
- @Test
- public void testReinitialize5Nodes() throws Exception {
- final int[] idIndex = {3, 4, 5};
- runTestReinitializeMultiGroups(idIndex, 0);
- }
-
- @Test
- public void testReinitialize7Nodes() throws Exception {
- final int[] idIndex = {1, 6, 7};
- runTestReinitializeMultiGroups(idIndex, 1);
- }
-
- @Test
- public void testReinitialize9Nodes() throws Exception {
- final int[] idIndex = {5, 8, 9};
- runTestReinitializeMultiGroups(idIndex, 2);
- }
-
- private void runTestReinitializeMultiGroups(int[] idIndex, int chosen) throws Exception {
- printThreadCount(null, "init");
- runTestReinitializeMultiGroups(getCluster(0), idIndex, chosen, NOOP);
- }
-
- static final CheckedBiConsumer<MiniRaftCluster, RaftGroup, RuntimeException> NOOP = (c, g) -> {};
-
- public static <T extends Throwable> void runTestReinitializeMultiGroups(
- MiniRaftCluster cluster, int[] idIndex, int chosen,
- CheckedBiConsumer<MiniRaftCluster, RaftGroup, T> checker)
- throws IOException, InterruptedException, T {
- if (chosen < 0) {
- chosen = ThreadLocalRandom.current().nextInt(idIndex.length);
- }
- final String type = cluster.getClass().getSimpleName()
- + Arrays.toString(idIndex) + "chosen=" + chosen;
- LOG.info("\n\nrunTestReinitializeMultiGroups with " + type + ": " + cluster.printServers());
-
- // Start server with an empty conf
- final RaftGroup emptyGroup = new RaftGroup(cluster.getGroupId());
-
- final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(idIndex[idIndex.length - 1], 0))
- .map(RaftPeerId::valueOf).collect(Collectors.toList());
- LOG.info("ids: " + ids);
- ids.forEach(id -> cluster.putNewServer(id, emptyGroup, true));
- LOG.info("putNewServer: " + cluster.printServers());
-
- TimeUnit.SECONDS.sleep(1);
- cluster.start();
-
- // Make sure that there are no leaders.
- TimeUnit.SECONDS.sleep(1);
- LOG.info("start: " + cluster.printServers());
- Assert.assertNull(cluster.getLeader());
-
- // Reinitialize servers to three groups
- final List<RaftPeer> allPeers = cluster.getPeers();
- Collections.sort(allPeers, Comparator.comparing(p -> p.getId().toString()));
- final RaftGroup[] groups = new RaftGroup[idIndex.length];
- for (int i = 0; i < idIndex.length; i++) {
- final RaftGroupId gid = RaftGroupId.randomId();
- final int previous = i == 0 ? 0 : idIndex[i - 1];
- final RaftPeer[] peers = allPeers.subList(previous, idIndex[i]).toArray(RaftPeer.emptyArray());
- groups[i] = new RaftGroup(gid, peers);
-
- LOG.info(i + ") starting " + groups[i]);
- for(RaftPeer p : peers) {
- try(final RaftClient client = cluster.createClient(p.getId(), emptyGroup)) {
- client.reinitialize(groups[i], p.getId());
- }
- }
- Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true, gid));
- checker.accept(cluster, groups[i]);
- }
- printThreadCount(type, "start groups");
- LOG.info("start groups: " + cluster.printServers());
-
- // randomly close two of the groups (i.e. reinitialize to empty peers)
- LOG.info("chosen = " + chosen + ", " + groups[chosen]);
-
- for (int i = 0; i < groups.length; i++) {
- if (i != chosen) {
- final RaftGroup g = groups[i];
- final RaftGroup newGroup = new RaftGroup(g.getGroupId());
- LOG.info(i + ") close " + cluster.printServers(g.getGroupId()));
- for(RaftPeer p : g.getPeers()) {
- try (final RaftClient client = cluster.createClient(p.getId(), g)) {
- client.reinitialize(newGroup, p.getId());
- }
- }
- }
- }
- printThreadCount(type, "close groups");
- LOG.info("close groups: " + cluster.printServers());
-
- // update chosen group to use all the peers
- final RaftGroup newGroup = new RaftGroup(groups[chosen].getGroupId());
- for(int i = 0; i < groups.length; i++) {
- if (i != chosen) {
- LOG.info(i + ") reinitialize: " + cluster.printServers(groups[i].getGroupId()));
- for (RaftPeer p : groups[i].getPeers()) {
- try (final RaftClient client = cluster.createClient(p.getId(), groups[i])) {
- client.reinitialize(newGroup, p.getId());
- }
- }
- }
- }
- LOG.info(chosen + ") setConfiguration: " + cluster.printServers(groups[chosen].getGroupId()));
- try (final RaftClient client = cluster.createClient(groups[chosen])) {
- client.setConfiguration(allPeers.toArray(RaftPeer.emptyArray()));
- }
-
- Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true));
- checker.accept(cluster, groups[chosen]);
- LOG.info("update groups: " + cluster.printServers());
- printThreadCount(type, "update groups");
-
- cluster.shutdown();
- printThreadCount(type, "shutdown");
- }
-
- static void printThreadCount(String type, String label) {
- System.out.println("| " + type + " | " + label + " | "
- + JavaUtils.getRootThreadGroup().activeCount() + " |");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index c4a9a5e..d9bbc43 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -155,9 +155,9 @@ class SimulatedServerRpc implements RaftServerRpc {
public RaftClientReply handleRequest(RaftClientRequest request)
throws IOException {
final CompletableFuture<RaftClientReply> future;
- if (request instanceof ReinitializeRequest) {
+ if (request instanceof GroupManagementRequest) {
future = CompletableFuture.completedFuture(
- server.reinitialize((ReinitializeRequest) request));
+ server.groupManagement((GroupManagementRequest) request));
} else if (request instanceof ServerInformationRequest) {
future = CompletableFuture.completedFuture(
server.getInfo((ServerInformationRequest) request));
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestGroupManagementWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestGroupManagementWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestGroupManagementWithSimulatedRpc.java
new file mode 100644
index 0000000..73fbae9
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestGroupManagementWithSimulatedRpc.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.server.impl.GroupManagementBaseTest;
+
+public class TestGroupManagementWithSimulatedRpc extends GroupManagementBaseTest {
+ @Override
+ public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() {
+ return MiniRaftClusterWithSimulatedRpc.FACTORY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestReinitializationWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestReinitializationWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestReinitializationWithSimulatedRpc.java
deleted file mode 100644
index 7fc0c6c..0000000
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestReinitializationWithSimulatedRpc.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.server.simulation;
-
-import org.apache.ratis.MiniRaftCluster;
-import org.apache.ratis.server.impl.ReinitializationBaseTest;
-
-public class TestReinitializationWithSimulatedRpc extends ReinitializationBaseTest {
- @Override
- public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() {
- return MiniRaftClusterWithSimulatedRpc.FACTORY;
- }
-}