You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ms...@apache.org on 2018/09/17 17:43:20 UTC
incubator-ratis git commit: RATIS-315. Add an option to delete the
group directory in groupRemove. Contributed by Tsz Wo Nicholas Sze.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 50588bde3 -> eca35312c
RATIS-315. Add an option to delete the group directory in groupRemove. Contributed by Tsz Wo Nicholas Sze.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/eca35312
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/eca35312
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/eca35312
Branch: refs/heads/master
Commit: eca35312cef825c209c14b3dff3bc2b94311ef14
Parents: 50588bd
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Mon Sep 17 23:09:27 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Mon Sep 17 23:09:27 2018 +0530
----------------------------------------------------------------------
.../org/apache/ratis/client/RaftClient.java | 2 +-
.../ratis/client/impl/ClientProtoUtils.java | 9 ++-
.../ratis/client/impl/RaftClientImpl.java | 4 +-
.../ratis/protocol/GroupManagementRequest.java | 15 +++--
.../java/org/apache/ratis/util/StringUtils.java | 13 ++++
.../org/apache/ratis/grpc/TestRaftWithGrpc.java | 30 ++++-----
.../ratis/hadooprpc/TestRaftWithHadoopRpc.java | 40 ++++++------
.../ratis/netty/server/NettyRpcService.java | 8 +--
.../apache/ratis/netty/TestRaftWithNetty.java | 17 +----
ratis-proto-shaded/src/main/proto/Raft.proto | 1 +
.../org/apache/ratis/server/RaftServer.java | 2 +-
.../ratis/server/impl/ConfigurationManager.java | 9 ++-
.../apache/ratis/server/impl/LeaderState.java | 14 ++---
.../ratis/server/impl/RaftConfiguration.java | 2 +-
.../ratis/server/impl/RaftServerImpl.java | 18 ++++--
.../ratis/server/impl/RaftServerProxy.java | 49 +++++++++++----
.../ratis/server/impl/ServerImplUtils.java | 4 +-
.../ratis/server/impl/ServerProtoUtils.java | 8 ++-
.../apache/ratis/server/impl/ServerState.java | 23 +++----
.../server/storage/RaftStorageDirectory.java | 5 +-
.../java/org/apache/ratis/MiniRaftCluster.java | 39 +++++++-----
.../java/org/apache/ratis/RaftBasicTests.java | 66 +++++++++-----------
.../org/apache/ratis/RaftExceptionBaseTest.java | 2 +-
.../java/org/apache/ratis/RaftTestUtil.java | 4 +-
.../server/impl/GroupManagementBaseTest.java | 32 +++++++---
.../impl/RaftReconfigurationBaseTest.java | 17 ++---
.../simulation/TestRaftWithSimulatedRpc.java | 27 +-------
27 files changed, 254 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 20d746b..ea8b1a2 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -95,7 +95,7 @@ public interface RaftClient extends Closeable {
RaftClientReply groupAdd(RaftGroup newGroup, RaftPeerId server) throws IOException;
/** Send groupRemove request to the given server (not the raft service). */
- RaftClientReply groupRemove(RaftGroupId groupId, RaftPeerId server) throws IOException;
+ RaftClientReply groupRemove(RaftGroupId groupId, boolean deleteDirectory, RaftPeerId server) throws IOException;
/** Send serverInformation request to the given server.*/
RaftClientReply serverInformation(RaftPeerId server) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 7065dd4..b5a6172 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -302,8 +302,9 @@ public interface ClientProtoUtils {
return GroupManagementRequest.newAdd(clientId, serverId, m.getCallId(),
ProtoUtils.toRaftGroup(p.getGroupAdd().getGroup()));
case GROUPREMOVE:
+ final GroupRemoveRequestProto remove = p.getGroupRemove();
return GroupManagementRequest.newRemove(clientId, serverId, m.getCallId(),
- ProtoUtils.toRaftGroupId(p.getGroupRemove().getGroupId()));
+ ProtoUtils.toRaftGroupId(remove.getGroupId()), remove.getDeleteDirectory());
default:
throw new IllegalArgumentException("Unexpected op " + p.getOpCase() + " in " + p);
}
@@ -329,8 +330,10 @@ public interface ClientProtoUtils {
}
final GroupManagementRequest.Remove remove = request.getRemove();
if (remove != null) {
- b.setGroupRemove(GroupRemoveRequestProto.newBuilder().setGroupId(
- ProtoUtils.toRaftGroupIdProtoBuilder(remove.getGroupId())).build());
+ b.setGroupRemove(GroupRemoveRequestProto.newBuilder()
+ .setGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(remove.getGroupId()))
+ .setDeleteDirectory(remove.isDeleteDirectory())
+ .build());
}
return b.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index d6c9a27..a07e229 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -221,12 +221,12 @@ final class RaftClientImpl implements RaftClient {
}
@Override
- public RaftClientReply groupRemove(RaftGroupId groupId, RaftPeerId server) throws IOException {
+ public RaftClientReply groupRemove(RaftGroupId groupId, boolean deleteDirectory, RaftPeerId server) throws IOException {
Objects.requireNonNull(groupId, "groupId == null");
Objects.requireNonNull(server, "server == null");
final long callId = nextCallId();
- return sendRequest(GroupManagementRequest.newRemove(clientId, server, callId, groupId));
+ return sendRequest(GroupManagementRequest.newRemove(clientId, server, callId, groupId, deleteDirectory));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
index 8577548..cbaae3b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
@@ -46,9 +46,11 @@ public class GroupManagementRequest extends RaftClientRequest {
public static class Remove extends Op {
private final RaftGroupId groupId;
+ private final boolean deleteDirectory;
- public Remove(RaftGroupId groupId) {
+ public Remove(RaftGroupId groupId, boolean deleteDirectory) {
this.groupId = groupId;
+ this.deleteDirectory = deleteDirectory;
}
@Override
@@ -56,9 +58,13 @@ public class GroupManagementRequest extends RaftClientRequest {
return groupId;
}
+ public boolean isDeleteDirectory() {
+ return deleteDirectory;
+ }
+
@Override
public String toString() {
- return getClass().getSimpleName() + ":" + getGroupId();
+ return getClass().getSimpleName() + ":" + getGroupId() + ", " + (deleteDirectory? "delete": "retain") + "-dir";
}
}
@@ -66,8 +72,9 @@ public class GroupManagementRequest extends RaftClientRequest {
return new GroupManagementRequest(clientId, serverId, callId, new Add(group));
}
- public static GroupManagementRequest newRemove(ClientId clientId, RaftPeerId serverId, long callId, RaftGroupId groupId) {
- return new GroupManagementRequest(clientId, serverId, callId, new Remove(groupId));
+ public static GroupManagementRequest newRemove(ClientId clientId, RaftPeerId serverId, long callId,
+ RaftGroupId groupId, boolean deleteDirectory) {
+ return new GroupManagementRequest(clientId, serverId, callId, new Remove(groupId, deleteDirectory));
}
private final Op op;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
index 710a0a2..70039b2 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
@@ -25,6 +25,7 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.Locale;
+import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
@@ -128,4 +129,16 @@ public class StringUtils {
}
};
}
+
+ public static <K, V> String map2String(Map<K, V> map) {
+ if (map == null) {
+ return null;
+ } else if (map.isEmpty()) {
+ return "<EMPTY_MAP>";
+ } else {
+ final StringBuilder b = new StringBuilder("{");
+ map.entrySet().stream().forEach(e -> b.append("\n ").append(e.getKey()).append(" -> ").append(e.getValue()));
+ return b.append("\n}").toString();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index 30fcd91..16c0f31 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -30,27 +30,18 @@ import org.apache.ratis.statemachine.StateMachine;
import org.junit.Assert;
import org.junit.Test;
-import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
-public class TestRaftWithGrpc extends RaftBasicTests {
- private final MiniRaftClusterWithGRpc cluster;
+public class TestRaftWithGrpc
+ extends RaftBasicTests<MiniRaftClusterWithGRpc>
+ implements MiniRaftClusterWithGRpc.FactoryGet {
- public TestRaftWithGrpc() throws IOException {
- properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+ {
+ getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
SimpleStateMachine4Testing.class, StateMachine.class);
- cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(
- NUM_SERVERS, properties);
- Assert.assertNull(cluster.getLeader());
- }
-
- @Override
- public MiniRaftClusterWithGRpc getCluster() {
- return cluster;
}
@Override
@@ -62,14 +53,17 @@ public class TestRaftWithGrpc extends RaftBasicTests {
@Test
public void testRequestTimeout() throws Exception {
- testRequestTimeout(false, getCluster(), LOG);
+ try(MiniRaftClusterWithGRpc cluster = newCluster(NUM_SERVERS)) {
+ cluster.start();
+ testRequestTimeout(false, cluster, LOG);
+ }
}
@Test
- public void testUpdateViaHeartbeat()
- throws IOException, InterruptedException, ExecutionException {
+ public void testUpdateViaHeartbeat() throws Exception {
LOG.info("Running testUpdateViaHeartbeat");
- final MiniRaftClusterWithGRpc cluster = getCluster();
+ final MiniRaftClusterWithGRpc cluster = newCluster(NUM_SERVERS);
+ cluster.start();
waitForLeader(cluster);
long waitTime = 5000;
try (final RaftClient client = cluster.createClient()) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
index 3f51c0b..659d37c 100644
--- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
+++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
@@ -20,34 +20,33 @@ package org.apache.ratis.hadooprpc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.log4j.Level;
+import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftBasicTests;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.util.LogUtils;
import org.junit.Test;
-import java.io.IOException;
-
-public class TestRaftWithHadoopRpc extends RaftBasicTests {
+public class TestRaftWithHadoopRpc
+ extends RaftBasicTests<MiniRaftClusterWithHadoopRpc> {
static {
LogUtils.setLogLevel(MiniRaftClusterWithHadoopRpc.LOG, Level.DEBUG);
}
- private final MiniRaftClusterWithHadoopRpc cluster;
-
- public TestRaftWithHadoopRpc() throws IOException {
- final Configuration conf = new Configuration();
- HadoopConfigKeys.Ipc.setHandlers(conf, 20);
- conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
- conf.setInt(CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, 1000);
- conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
- cluster = MiniRaftClusterWithHadoopRpc.FACTORY.newCluster(
- NUM_SERVERS, getProperties(), conf);
- }
-
- @Override
- public MiniRaftClusterWithHadoopRpc getCluster() {
- return cluster;
+ static final Configuration CONF = new Configuration();
+ static {
+ HadoopConfigKeys.Ipc.setHandlers(CONF, 20);
+ CONF.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+ CONF.setInt(CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, 1000);
+ CONF.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
}
+ static final MiniRaftCluster.Factory<MiniRaftClusterWithHadoopRpc> FACTORY
+ = new MiniRaftClusterWithHadoopRpc.Factory() {
+ @Override
+ public MiniRaftClusterWithHadoopRpc newCluster(String[] ids, RaftProperties prop) {
+ return newCluster(ids, prop, CONF);
+ }
+ };
@Override
@Test
@@ -55,4 +54,9 @@ public class TestRaftWithHadoopRpc extends RaftBasicTests {
super.testWithLoad();
BlockRequestHandlingInjection.getInstance().unblockAll();
}
+
+ @Override
+ public MiniRaftCluster.Factory<MiniRaftClusterWithHadoopRpc> getFactory() {
+ return FACTORY;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 7dba943..f6fcbc6 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -274,12 +274,12 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy,
RaftRpcRequestProto request, RaftNettyServerRequestProto proto)
throws IOException {
final RaftPeerId id = RaftPeerId.valueOf(request.getReplyId());
- final NettyRpcProxy p = getProxies().getProxy(id);
try {
+ final NettyRpcProxy p = getProxies().getProxy(id);
return p.send(request, proto);
- } catch (ClosedChannelException cce) {
- getProxies().resetProxy(id);
- throw cce;
+ } catch (Exception e) {
+ getProxies().handleException(id, e, false);
+ throw e;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java
index b3996ac..28815d7 100644
--- a/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java
+++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java
@@ -21,20 +21,9 @@ import org.apache.ratis.RaftBasicTests;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.junit.Test;
-import java.io.IOException;
-
-public class TestRaftWithNetty extends RaftBasicTests {
- private final MiniRaftClusterWithNetty cluster;
-
- public TestRaftWithNetty() throws IOException {
- cluster = MiniRaftClusterWithNetty.FACTORY.newCluster(
- NUM_SERVERS, getProperties());
- }
-
- @Override
- public MiniRaftClusterWithNetty getCluster() {
- return cluster;
- }
+public class TestRaftWithNetty
+ extends RaftBasicTests<MiniRaftClusterWithNetty>
+ implements MiniRaftClusterWithNetty.FactoryGet {
@Override
@Test
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index 3d7eab3..0a93e95 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -251,6 +251,7 @@ message GroupAddRequestProto {
message GroupRemoveRequestProto {
RaftGroupIdProto groupId = 1; // the group to be removed.
+ bool deleteDirectory = 2; // delete the directory for that group?
}
message GroupManagementRequestProto {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index e071d4b..ac3111e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -64,7 +64,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
class Builder {
private RaftPeerId serverId;
private StateMachine stateMachine;
- private RaftGroup group = RaftGroup.emptyGroup();
+ private RaftGroup group = null;
private RaftProperties properties;
private Parameters parameters;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
index 6aed1d7..f8e5e57 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
@@ -18,6 +18,7 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.StringUtils;
import java.util.*;
@@ -42,8 +43,7 @@ public class ConfigurationManager {
this.currentConf = initialConf;
}
- public synchronized void addConfiguration(long logIndex,
- RaftConfiguration conf) {
+ synchronized void addConfiguration(long logIndex, RaftConfiguration conf) {
Preconditions.assertTrue(configurations.isEmpty() ||
configurations.lastEntry().getKey() < logIndex);
configurations.put(logIndex, conf);
@@ -76,5 +76,10 @@ public class ConfigurationManager {
return 1 + configurations.size();
}
+ @Override
+ public synchronized String toString() {
+ return getClass().getSimpleName() + ", init=" + initialConf + ", confs=" + StringUtils.map2String(configurations);
+ }
+
// TODO: remove Configuration entries after they are committed
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index a13284f..839aa69 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -24,7 +24,6 @@ import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.LeaderNoOp;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.*;
@@ -152,21 +151,22 @@ public class LeaderState {
voterLists = divideFollowers(conf);
}
- void start() {
- // In the beginning of the new term, replicate an empty entry in order
+ LogEntryProto start() {
+ // In the beginning of the new term, replicate a conf entry in order
// to finally commit entries in the previous term.
- // Also this message can help identify the last committed index when
- // the leader peer is just started.
+ // Also this message can help identify the last committed index and the conf.
final LogEntryProto placeHolder = LogEntryProto.newBuilder()
.setTerm(server.getState().getCurrentTerm())
.setIndex(raftLog.getNextIndex())
- .setNoOp(LeaderNoOp.newBuilder()).build();
+ .setConfigurationEntry(ServerProtoUtils.toRaftConfigurationProto(server.getRaftConf()))
+ .build();
CodeInjectionForTesting.execute(APPEND_PLACEHOLDER,
server.getId().toString(), null);
raftLog.append(placeHolder);
processor.start();
senders.forEach(LogAppender::startAppender);
+ return placeHolder;
}
boolean isReady() {
@@ -507,7 +507,7 @@ public class LeaderState {
}
// the pending request handler will send NotLeaderException for
// pending client requests when it stops
- server.shutdown();
+ server.shutdown(false);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
index 0034f4e..eac0f58 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
@@ -215,7 +215,7 @@ public class RaftConfiguration {
@Override
public String toString() {
- return conf + ", old=" + oldConf;
+ return logEntryIndex + ": " + conf + ", old=" + oldConf;
}
boolean hasNoChange(RaftPeer[] newMembers) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index e33177a..1428c90 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -27,6 +27,7 @@ import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.statemachine.SnapshotInfo;
@@ -46,7 +47,6 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.ratis.server.impl.ServerProtoUtils.toRaftConfiguration;
import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*;
import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY;
@@ -237,7 +237,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
return new RaftGroup(groupId, getRaftConf().getPeers());
}
- void shutdown() {
+ void shutdown(boolean deleteDirectory) {
lifeCycle.checkStateAndClose(() -> {
LOG.info("{}: shutdown {}", getId(), groupId);
try {
@@ -265,6 +265,14 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
} catch (Exception ignored) {
LOG.warn("Failed to close state for " + getId(), ignored);
}
+ if (deleteDirectory) {
+ final RaftStorageDirectory dir = state.getStorage().getStorageDir();
+ try {
+ FileUtils.deleteFully(dir.getRoot());
+ } catch(Exception ignored) {
+ LOG.warn(getId() + ": Failed to remove RaftStorageDirectory " + dir, ignored);
+ }
+ }
});
}
@@ -342,7 +350,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
// start sending AppendEntries RPC to followers
leaderState = new LeaderState(this, getProxy().getProperties());
- leaderState.start();
+ final LogEntryProto e = leaderState.start();
+ getState().setRaftConf(e.getIndex(), ServerProtoUtils.toRaftConfiguration(e));
}
private void startHeartbeatMonitor() {
@@ -1124,8 +1133,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
// the reply should have already been set. only need to record
// the new conf in the state machine.
- stateMachine.setRaftConfiguration(toRaftConfiguration(next.getIndex(),
- next.getConfigurationEntry()));
+ stateMachine.setRaftConfiguration(ServerProtoUtils.toRaftConfiguration(next));
} else if (next.getLogEntryBodyCase() == SMLOGENTRY) {
// check whether there is a TransactionContext because we are the leader.
TransactionContext trx = getTransactionContext(next.getIndex());
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index aea2767..0afd596 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -23,6 +23,7 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.statemachine.StateMachine;
@@ -36,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
+import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -77,17 +79,13 @@ public class RaftServerProxy implements RaftServer {
final CompletableFuture<RaftServerImpl> newImpl = newRaftServerImpl(group);
final CompletableFuture<RaftServerImpl> previous = map.put(groupId, newImpl);
Preconditions.assertNull(previous, "previous");
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: addNew {} returns {}", getId(), group, toString(groupId, newImpl));
- }
+ LOG.info("{}: addNew {} returns {}", getId(), group, toString(groupId, newImpl));
return newImpl;
}
synchronized CompletableFuture<RaftServerImpl> remove(RaftGroupId groupId) {
final CompletableFuture<RaftServerImpl> future = map.remove(groupId);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: remove {}", getId(), toString(groupId, future));
- }
+ LOG.info("{}: remove {}", getId(), toString(groupId, future));
return future;
}
@@ -98,7 +96,8 @@ public class RaftServerProxy implements RaftServer {
return;
}
isClosed = true;
- map.values().parallelStream().map(CompletableFuture::join).forEach(RaftServerImpl::shutdown);
+ map.values().parallelStream().map(CompletableFuture::join)
+ .forEach(impl -> impl.shutdown(false));
}
synchronized List<CompletableFuture<RaftServerImpl>> getAll() {
@@ -162,6 +161,29 @@ public class RaftServerProxy implements RaftServer {
this.lifeCycle = new LifeCycle(this.id);
}
+ /** Check the storage dir and add groups*/
+ void initGroups(RaftGroup group) {
+ final File dir = RaftServerConfigKeys.storageDir(properties);
+ if (dir.isDirectory()) {
+ for(File sub : dir.listFiles()) {
+ if (sub.isDirectory()) {
+ LOG.info("{}: found a subdirectory {}", getId(), sub);
+ try {
+ final RaftGroupId groupId = RaftGroupId.valueOf(UUID.fromString(sub.getName()));
+ if (group == null || !groupId.equals(group.getGroupId())) {
+ addGroup(new RaftGroup(groupId));
+ }
+ } catch(Throwable t) {
+ LOG.warn(getId() + ": Failed to initialize the group directory " + sub.getAbsolutePath() + ". Ignoring it", t);
+ }
+ }
+ }
+ }
+ if (group != null) {
+ addGroup(group);
+ }
+ }
+
private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group) {
return CompletableFuture.supplyAsync(() -> {
try {
@@ -217,7 +239,7 @@ public class RaftServerProxy implements RaftServer {
return impls.containsGroup(groupId);
}
- CompletableFuture<RaftServerImpl> addGroup(RaftGroup group) {
+ public CompletableFuture<RaftServerImpl> addGroup(RaftGroup group) {
return impls.addNew(group);
}
@@ -309,17 +331,17 @@ public class RaftServerProxy implements RaftServer {
}
final GroupManagementRequest.Add add = request.getAdd();
if (add != null) {
- return groupdAddAsync(request, add.getGroup());
+ return groupAddAsync(request, add.getGroup());
}
final GroupManagementRequest.Remove remove = request.getRemove();
if (remove != null) {
- return groupRemoveAsync(request, remove.getGroupId());
+ return groupRemoveAsync(request, remove.getGroupId(), remove.isDeleteDirectory());
}
return JavaUtils.completeExceptionally(new UnsupportedOperationException(
getId() + ": Request not supported " + request));
}
- private CompletableFuture<RaftClientReply> groupdAddAsync(GroupManagementRequest request, RaftGroup newGroup) {
+ private CompletableFuture<RaftClientReply> groupAddAsync(GroupManagementRequest request, RaftGroup newGroup) {
if (!request.getRaftGroupId().equals(newGroup.getGroupId())) {
return JavaUtils.completeExceptionally(new GroupMismatchException(
getId() + ": Request group id (" + request.getRaftGroupId() + ") does not match the new group " + newGroup));
@@ -339,7 +361,8 @@ public class RaftServerProxy implements RaftServer {
});
}
- private CompletableFuture<RaftClientReply> groupRemoveAsync(RaftClientRequest request, RaftGroupId groupId) {
+ private CompletableFuture<RaftClientReply> groupRemoveAsync(
+ RaftClientRequest request, RaftGroupId groupId, boolean deleteDirectory) {
if (!request.getRaftGroupId().equals(groupId)) {
return JavaUtils.completeExceptionally(new GroupMismatchException(
getId() + ": Request group id (" + request.getRaftGroupId() + ") does not match the given group id " + groupId));
@@ -351,7 +374,7 @@ public class RaftServerProxy implements RaftServer {
}
return f.thenApply(impl -> {
final Collection<CommitInfoProto> commitInfos = impl.getCommitInfos();
- impl.shutdown();
+ impl.shutdown(deleteDirectory);
return new RaftClientReply(request, commitInfos);
});
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 7b80dcf..2befb38 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -36,9 +36,7 @@ public class ServerImplUtils {
RaftProperties properties, Parameters parameters) throws IOException {
RaftServerProxy.LOG.debug("newRaftServer: {}, {}", id, group);
final RaftServerProxy proxy = newRaftServer(id, gid -> stateMachine, properties, parameters);
- if (group != null) {
- proxy.addGroup(group);
- }
+ proxy.initGroups(group);
return proxy;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 6855473..3774737 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -31,6 +31,7 @@ import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.*;
+import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
/** Server proto utilities for internal use. */
@@ -91,11 +92,12 @@ public class ServerProtoUtils {
.build();
}
- public static RaftConfiguration toRaftConfiguration(
- long index, RaftConfigurationProto proto) {
+ public static RaftConfiguration toRaftConfiguration(LogEntryProto entry) {
+ Preconditions.assertTrue(ProtoUtils.isConfigurationLogEntry(entry));
+ final RaftConfigurationProto proto = entry.getConfigurationEntry();
final RaftConfiguration.Builder b = RaftConfiguration.newBuilder()
.setConf(ProtoUtils.toRaftPeerArray(proto.getPeersList()))
- .setLogEntryIndex(index);
+ .setLogEntryIndex(entry.getIndex());
if (proto.getOldPeersCount() > 0) {
b.setOldConf(ProtoUtils.toRaftPeerArray(proto.getOldPeersList()));
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index c5a6a98..3f064a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -89,9 +89,11 @@ public class ServerState implements Closeable {
RaftConfiguration initialConf = RaftConfiguration.newBuilder()
.setConf(group.getPeers()).build();
configurationManager = new ConfigurationManager(initialConf);
+ LOG.info("{}: {}", id, configurationManager);
final File dir = RaftServerConfigKeys.storageDir(prop);
- storage = new RaftStorage(new File(dir, group.getGroupId().toString()),
+ // use full uuid string to create a subdirectory
+ storage = new RaftStorage(new File(dir, group.getGroupId().getUuid().toString()),
RaftServerConstants.StartupOption.REGULAR);
snapshotManager = new SnapshotManager(storage, id);
@@ -107,9 +109,7 @@ public class ServerState implements Closeable {
// do not know whether the local log entries have been committed.
log = initLog(id, prop, lastApplied, entry -> {
if (entry.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
- configurationManager.addConfiguration(entry.getIndex(),
- ServerProtoUtils.toRaftConfiguration(entry.getIndex(),
- entry.getConfigurationEntry()));
+ setRaftConf(entry.getIndex(), ServerProtoUtils.toRaftConfiguration(entry));
}
});
@@ -134,8 +134,7 @@ public class ServerState implements Closeable {
// get the raft configuration from the snapshot
RaftConfiguration raftConf = sm.getRaftConfiguration();
if (raftConf != null) {
- configurationManager.addConfiguration(raftConf.getLogEntryIndex(),
- raftConf);
+ setRaftConf(raftConf.getLogEntryIndex(), raftConf);
}
return snapshot.getIndex();
}
@@ -314,10 +313,11 @@ public class ServerState implements Closeable {
getRaftConf().getLogEntryIndex();
}
- public void setRaftConf(long logIndex, RaftConfiguration conf) {
+ void setRaftConf(long logIndex, RaftConfiguration conf) {
configurationManager.addConfiguration(logIndex, conf);
- LOG.info("{}: successfully update the configuration {}",
- getSelfId(), conf);
+ server.getServerRpc().addPeers(conf.getPeers());
+ LOG.info("{}: set configuration {} at {}", getSelfId(), conf, logIndex);
+ LOG.debug("{}: {}", getSelfId(), configurationManager);
}
void updateConfiguration(LogEntryProto[] entries) {
@@ -325,10 +325,7 @@ public class ServerState implements Closeable {
configurationManager.removeConfigurations(entries[0].getIndex());
for (LogEntryProto entry : entries) {
if (ProtoUtils.isConfigurationLogEntry(entry)) {
- final RaftConfiguration conf = ServerProtoUtils.toRaftConfiguration(
- entry.getIndex(), entry.getConfigurationEntry());
- configurationManager.addConfiguration(entry.getIndex(), conf);
- server.getServerRpc().addPeers(conf.getPeers());
+ setRaftConf(entry.getIndex(), ServerProtoUtils.toRaftConfiguration(entry));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
index bcb5823..05208a3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
@@ -108,12 +108,11 @@ public class RaftStorageDirectory {
* writing the version file to disk.
*/
void clearDirectory() throws IOException {
- File curDir = this.getCurrentDir();
- clearDirectory(curDir);
+ clearDirectory(getCurrentDir());
clearDirectory(getStateMachineDir());
}
- void clearDirectory(File dir) throws IOException {
+ private static void clearDirectory(File dir) throws IOException {
if (dir.exists()) {
LOG.info(dir + " already exists. Deleting it ...");
FileUtils.deleteFully(dir);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 8104938..7a1e83e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -36,6 +36,7 @@ import org.apache.ratis.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -43,6 +44,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -52,7 +54,7 @@ import java.util.stream.StreamSupport;
import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
-public abstract class MiniRaftCluster {
+public abstract class MiniRaftCluster implements Closeable {
public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class);
public static final String CLASS_NAME = MiniRaftCluster.class.getSimpleName();
@@ -96,10 +98,9 @@ public abstract class MiniRaftCluster {
}
}
- public static int getPort(RaftPeerId id, RaftGroup group) {
- final List<RaftPeer> peers = group.getPeers().stream()
- .filter(raftPeer -> raftPeer.getId().equals(id)).collect(Collectors.toList());
- final String address = peers.isEmpty() ? null : peers.get(0).getAddress();
+ protected int getPort(RaftPeerId id, RaftGroup g) {
+ final RaftPeer p = g != null? g.getPeer(id): peers.get(id);
+ final String address = p == null? null : p.getAddress();
final InetSocketAddress inetAddress = address != null?
NetUtils.createSocketAddr(address): NetUtils.createLocalServerAddress();
return inetAddress.getPort();
@@ -126,9 +127,12 @@ public abstract class MiniRaftCluster {
return new RaftGroup(RaftGroupId.randomId(), peers);
}
+ private final Supplier<File> rootTestDir = JavaUtils.memoize(
+ () -> new File(BaseTest.getRootTestDir(),
+ getClass().getSimpleName() + Integer.toHexString(ThreadLocalRandom.current().nextInt())));
+
private File getStorageDir(RaftPeerId id) {
- return new File(BaseTest.getRootTestDir()
- + "/" + getClass().getSimpleName() + "/" + id);
+ return new File(rootTestDir.get(), id.toString());
}
public static String[] generateIds(int numServers, int base) {
@@ -143,12 +147,13 @@ public abstract class MiniRaftCluster {
protected final RaftProperties properties;
protected final Parameters parameters;
protected final Map<RaftPeerId, RaftServerProxy> servers = new ConcurrentHashMap<>();
+ protected final Map<RaftPeerId, RaftPeer> peers = new ConcurrentHashMap<>();
private final Timer timer;
protected MiniRaftCluster(String[] ids, RaftProperties properties, Parameters parameters) {
this.group = initRaftGroup(Arrays.asList(ids));
- LOG.info("new MiniRaftCluster {}", group);
+ LOG.info("new {} with {}", getClass().getSimpleName(), group);
this.properties = new RaftProperties(properties);
this.parameters = parameters;
@@ -169,20 +174,17 @@ public abstract class MiniRaftCluster {
return this;
}
- private RaftServerProxy putNewServer(RaftPeerId id, boolean format) {
- return putNewServer(id, group, format);
- }
-
public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean format) {
final RaftServerProxy s = newRaftServer(id, group, format);
Preconditions.assertTrue(servers.put(id, s) == null);
+ peers.put(id, toRaftPeer(s));
return s;
}
private Collection<RaftServerProxy> putNewServers(
Iterable<RaftPeerId> peers, boolean format) {
return StreamSupport.stream(peers.spliterator(), false)
- .map(id -> putNewServer(id, format))
+ .map(id -> putNewServer(id, group, format))
.collect(Collectors.toList());
}
@@ -201,10 +203,14 @@ public abstract class MiniRaftCluster {
* start a stopped server again.
*/
public void restartServer(RaftPeerId newId, boolean format) throws IOException {
+ restartServer(newId, group, format);
+ }
+
+ public void restartServer(RaftPeerId newId, RaftGroup group, boolean format) throws IOException {
killServer(newId);
servers.remove(newId);
- putNewServer(newId, format).start();
+ putNewServer(newId, group, format).start();
}
public void restart(boolean format) throws IOException {
@@ -547,6 +553,11 @@ public abstract class MiniRaftCluster {
}
}
+ @Override
+ public void close() {
+ shutdown();
+ }
+
public void shutdown() {
LOG.info("************************************************************** ");
LOG.info("*** ");
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 8c0def9..8a41d90 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -21,7 +21,6 @@ import org.apache.log4j.Level;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.RaftClientTestUtil;
-import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.NotReplicatedException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
@@ -40,9 +39,7 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -51,7 +48,6 @@ import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -65,52 +61,42 @@ import static org.apache.ratis.RaftTestUtil.sendMessageInNewThread;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
import static org.junit.Assert.assertTrue;
-public abstract class RaftBasicTests extends BaseTest {
+public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
+ extends BaseTest
+ implements MiniRaftCluster.Factory.Get<CLUSTER> {
{
LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
LogUtils.setLogLevel(RaftServerTestUtil.getStateMachineUpdaterLog(), Level.DEBUG);
LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
- RaftServerConfigKeys.RetryCache.setExpiryTime(properties, TimeDuration
+
+ RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration
.valueOf(5, TimeUnit.SECONDS));
}
public static final int NUM_SERVERS = 5;
- protected static final RaftProperties properties = new RaftProperties();
-
- public abstract MiniRaftCluster getCluster();
-
- public RaftProperties getProperties() {
- return properties;
- }
-
- @Before
- public void setup() throws IOException {
- Assert.assertNull(getCluster().getLeader());
- getCluster().start();
- }
-
- @After
- public void tearDown() {
- final MiniRaftCluster cluster = getCluster();
- if (cluster != null) {
- cluster.shutdown();
- }
- }
-
@Test
public void testBasicAppendEntries() throws Exception {
- runTestBasicAppendEntries(false, ReplicationLevel.MAJORITY, false, 10, getCluster(), LOG);
+ try(CLUSTER cluster = newCluster(NUM_SERVERS)) {
+ cluster.start();
+ runTestBasicAppendEntries(false, ReplicationLevel.MAJORITY, false, 10, cluster, LOG);
+ }
}
@Test
public void testBasicAppendEntriesKillLeader() throws Exception {
- runTestBasicAppendEntries(false, ReplicationLevel.MAJORITY, true, 10, getCluster(), LOG);
+ try(CLUSTER cluster = newCluster(NUM_SERVERS)) {
+ cluster.start();
+ runTestBasicAppendEntries(false, ReplicationLevel.MAJORITY, true, 10, cluster, LOG);
+ }
}
@Test
public void testBasicAppendEntriesWithAllReplication() throws Exception {
- runTestBasicAppendEntries(false, ReplicationLevel.ALL, false, 10, getCluster(), LOG);
+ try(CLUSTER cluster = newCluster(NUM_SERVERS)) {
+ cluster.start();
+ runTestBasicAppendEntries(false, ReplicationLevel.ALL, false, 10, cluster, LOG);
+ }
}
static void killAndRestartServer(RaftPeerId id, long killSleepMs, long restartSleepMs, MiniRaftCluster cluster, Logger LOG) {
@@ -188,7 +174,8 @@ public abstract class RaftBasicTests extends BaseTest {
@Test
public void testOldLeaderCommit() throws Exception {
LOG.info("Running testOldLeaderCommit");
- final MiniRaftCluster cluster = getCluster();
+ final CLUSTER cluster = newCluster(NUM_SERVERS);
+ cluster.start();
final RaftServerImpl leader = waitForLeader(cluster);
final RaftPeerId leaderId = leader.getId();
final long term = leader.getState().getCurrentTerm();
@@ -224,12 +211,14 @@ public abstract class RaftBasicTests extends BaseTest {
cluster.getServerAliveStream().map(s -> s.getState().getLog())
.forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages));
LOG.info("terminating testOldLeaderCommit test");
+ cluster.shutdown();
}
@Test
public void testOldLeaderNotCommit() throws Exception {
LOG.info("Running testOldLeaderNotCommit");
- final MiniRaftCluster cluster = getCluster();
+ final CLUSTER cluster = newCluster(NUM_SERVERS);
+ cluster.start();
final RaftPeerId leaderId = waitForLeader(cluster).getId();
List<RaftServerImpl> followers = cluster.getFollowers();
@@ -259,6 +248,7 @@ public abstract class RaftBasicTests extends BaseTest {
cluster.getServerAliveStream()
.map(s -> s.getState().getLog())
.forEach(log -> RaftTestUtil.checkLogEntries(log, messages, predicate));
+ cluster.shutdown();
}
static class Client4TestWithLoad extends Thread {
@@ -340,7 +330,10 @@ public abstract class RaftBasicTests extends BaseTest {
@Test
public void testWithLoad() throws Exception {
- testWithLoad(10, 500, false, getCluster(), LOG);
+ try(CLUSTER cluster = newCluster(NUM_SERVERS)) {
+ cluster.start();
+ testWithLoad(10, 500, false, cluster, LOG);
+ }
}
public static void testWithLoad(final int numClients, final int numMessages,
@@ -456,7 +449,10 @@ public abstract class RaftBasicTests extends BaseTest {
@Test
public void testDelayRequestIfLeaderStepDown() throws Exception {
- runTestDelayRequestIfLeaderStepDown(false, getCluster(), LOG);
+ try(CLUSTER cluster = newCluster(NUM_SERVERS)) {
+ cluster.start();
+ runTestDelayRequestIfLeaderStepDown(false, cluster, LOG);
+ }
}
static void runTestDelayRequestIfLeaderStepDown(boolean async, MiniRaftCluster cluster, Logger LOG) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 30d9d10..ee338ed 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -192,7 +192,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
GroupMismatchException.class);
testFailureCase("groupRemove(..) with another group id",
- () -> client.groupRemove(anotherGroup.getGroupId(), clusterGroup.getPeers().iterator().next().getId()),
+ () -> client.groupRemove(anotherGroup.getGroupId(), false, clusterGroup.getPeers().iterator().next().getId()),
GroupMismatchException.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 8667f50..e167f17 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -188,8 +188,8 @@ public interface RaftTestUtil {
if (e.getLogEntryBodyCase() == LogEntryProto.LogEntryBodyCase.SMLOGENTRY) {
LOG.info(ServerProtoUtils.toString(e) + ", " + e.getSmLogEntry().toString().trim().replace("\n", ", "));
entries.add(e);
- } else if (e.getLogEntryBodyCase() == LogEntryProto.LogEntryBodyCase.NOOP) {
- LOG.info("Found " + LogEntryProto.LogEntryBodyCase.NOOP + " at " + ti
+ } else if (e.getLogEntryBodyCase() == LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY) {
+ LOG.info("Found " + LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY + " at " + ti
+ ", ignoring it.");
} else {
throw new AssertionError("Unexpected LogEntryBodyCase " + e.getLogEntryBodyCase() + " at " + ti
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index 5310b94..682f2cb 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -23,10 +23,12 @@ import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.util.CheckedBiConsumer;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
@@ -35,6 +37,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
@@ -62,17 +65,14 @@ public abstract class GroupManagementBaseTest extends BaseTest {
}
@Test
- public void testMultiGroup() throws Exception {
+ public void testSingleGroupRestart() throws Exception {
final MiniRaftCluster cluster = getCluster(0);
LOG.info("Start testMultiGroup" + cluster.printServers());
- // Start server with an empty conf
- final RaftGroupId groupId = cluster.getGroupId();
- final RaftGroup group = new RaftGroup(groupId);
-
+ // Start server with null group
final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(3, 0))
.map(RaftPeerId::valueOf).collect(Collectors.toList());
- ids.forEach(id -> cluster.putNewServer(id, group, true));
+ ids.forEach(id -> cluster.putNewServer(id, null, true));
LOG.info("putNewServer: " + cluster.printServers());
cluster.start();
@@ -90,6 +90,17 @@ public abstract class GroupManagementBaseTest extends BaseTest {
client.groupAdd(newGroup, p.getId());
}
Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true));
+ TimeUnit.SECONDS.sleep(1);
+
+ // restart the servers with null group
+ LOG.info("restart servers");
+ for(RaftPeer p : newGroup.getPeers()) {
+ cluster.restartServer(p.getId(), null, false);
+ }
+
+ // the servers should retrieve the conf from the log.
+ Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true));
+
cluster.shutdown();
}
@@ -176,9 +187,16 @@ public abstract class GroupManagementBaseTest extends BaseTest {
final RaftGroup g = groups[i];
LOG.info(i + ") close " + cluster.printServers(g.getGroupId()));
for(RaftPeer p : g.getPeers()) {
+ final File root = cluster.getServer(p.getId()).getImpl(g.getGroupId()).getState().getStorage().getStorageDir().getRoot();
+ Assert.assertTrue(root.exists());
+ Assert.assertTrue(root.isDirectory());
+
+ final RaftClientReply r;
try (final RaftClient client = cluster.createClient(p.getId(), g)) {
- client.groupRemove(g.getGroupId(), p.getId());
+ r = client.groupRemove(g.getGroupId(), true, p.getId());
}
+ Assert.assertTrue(r.isSuccess());
+ Assert.assertFalse(root.exists());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 79017d4..6374a21 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -48,7 +48,6 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.util.Arrays.asList;
import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf;
import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
-import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.NOOP;
public abstract class RaftReconfigurationBaseTest extends BaseTest {
static {
@@ -168,8 +167,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
final MiniRaftCluster cluster = getCluster(3);
cluster.start();
try {
- RaftTestUtil.waitForLeader(cluster);
- final RaftPeerId leaderId = cluster.getLeader().getId();
+ final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
final RaftClient client = cluster.createClient(leaderId);
// submit some msgs before reconf
@@ -212,16 +210,19 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
Assert.assertTrue(reconf1.get());
Assert.assertTrue(reconf2.get());
waitAndCheckNewConf(cluster, finalPeers.get(), 2, null);
+ final RaftPeerId leader2 = RaftTestUtil.waitForLeader(cluster).getId();
// check configuration manager's internal state
// each reconf will generate two configurations: (old, new) and (new)
- cluster.getServerAliveStream()
- .forEach(server -> {
+ cluster.getServerAliveStream().forEach(server -> {
ConfigurationManager confManager =
(ConfigurationManager) Whitebox.getInternalState(server.getState(),
"configurationManager");
// each reconf will generate two configurations: (old, new) and (new)
- Assert.assertEquals(5, confManager.numOfConf());
+ // each leader change generates one configuration.
+ // expectedConf = 1 (init) + 2*2 (two conf changes) + #leader
+ final int expechedConf = leader2.equals(leaderId)? 6: 7;
+ Assert.assertEquals(expechedConf, confManager.numOfConf());
});
} finally {
cluster.shutdown();
@@ -546,7 +547,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
// find CONFIGURATIONENTRY, there may be NOOP before and after it.
final long confIndex = JavaUtils.attempt(() -> {
final long last = log.getLastEntryTermIndex().getIndex();
- for (long i = 1; i <= last; i++) {
+ for (long i = last; i >= 1; i--) {
if (log.get(i).getLogEntryBodyCase() == CONFIGURATIONENTRY) {
return i;
}
@@ -571,7 +572,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
// the old leader should have truncated the setConf from the log
JavaUtils.attempt(() -> log.getLastCommittedIndex() >= confIndex,
10, 500L, "COMMIT", LOG);
- Assert.assertEquals(NOOP, log.get(confIndex).getLogEntryBodyCase());
+ Assert.assertEquals(CONFIGURATIONENTRY, log.get(confIndex).getLogEntryBodyCase());
log2 = null;
} finally {
RaftStorageTestUtils.printLog(log2, s -> LOG.info(s));
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
index b5a35d2..391a6fa 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
@@ -17,30 +17,9 @@
*/
package org.apache.ratis.server.simulation;
-import org.apache.log4j.Level;
import org.apache.ratis.RaftBasicTests;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.util.LogUtils;
-
-import java.io.IOException;
-
-public class TestRaftWithSimulatedRpc extends RaftBasicTests {
- static {
- LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
- LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
- }
-
- private final MiniRaftClusterWithSimulatedRpc cluster;
-
- public TestRaftWithSimulatedRpc() throws IOException {
- cluster = MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster(
- NUM_SERVERS, getProperties());
- }
-
- @Override
- public MiniRaftClusterWithSimulatedRpc getCluster() {
- return cluster;
- }
+public class TestRaftWithSimulatedRpc
+ extends RaftBasicTests<MiniRaftClusterWithSimulatedRpc>
+ implements MiniRaftClusterWithSimulatedRpc.FactoryGet {
}