You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/08/01 06:33:10 UTC
incubator-ratis git commit: RATIS-96. LeaderState
computeLastCommitted may throw ArrayIndexOutOfBoundsException. Contributed by
Tsz Wo Nicholas Sze.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 89e64f212 -> fb702de7c
RATIS-96. LeaderState computeLastCommitted may throw ArrayIndexOutOfBoundsException. Contributed by Tsz Wo Nicholas Sze.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/fb702de7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/fb702de7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/fb702de7
Branch: refs/heads/master
Commit: fb702de7cbccbaf0a2aa9dd1badb04f74194d73b
Parents: 89e64f2
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Jul 31 23:33:03 2017 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon Jul 31 23:33:03 2017 -0700
----------------------------------------------------------------------
.../ratis/client/impl/RaftClientImpl.java | 4 +-
.../org/apache/ratis/protocol/RaftGroup.java | 2 +-
.../org/apache/ratis/util/CollectionUtils.java | 6 +-
.../java/org/apache/ratis/util/JavaUtils.java | 44 +++++++
.../java/org/apache/ratis/util/LifeCycle.java | 13 +-
.../ratis/grpc/client/AppendStreamer.java | 3 +
.../apache/ratis/server/impl/LeaderState.java | 23 +++-
.../ratis/server/impl/RaftServerImpl.java | 25 ++--
.../apache/ratis/server/impl/ServerState.java | 8 +-
.../ratis/server/storage/RaftStorage.java | 15 ++-
.../java/org/apache/ratis/MiniRaftCluster.java | 53 ++++++--
.../java/org/apache/ratis/RaftTestUtil.java | 14 ++-
.../server/impl/ReinitializationBaseTest.java | 125 ++++++++++++++++++-
13 files changed, 282 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 1082f38..c7ad935 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -113,10 +113,8 @@ final class RaftClientImpl implements RaftClient {
throws InterruptedIOException, StateMachineException {
for(;;) {
final RaftClientRequest request = supplier.get();
- LOG.debug("{}: {}", clientId, request);
final RaftClientReply reply = sendRequest(request);
if (reply != null) {
- LOG.debug("{}: {}", clientId, reply);
return reply;
}
@@ -133,6 +131,7 @@ final class RaftClientImpl implements RaftClient {
private RaftClientReply sendRequest(RaftClientRequest request)
throws StateMachineException {
+ LOG.debug("{}: {}", clientId, request);
RaftClientReply reply = null;
try {
reply = clientRpc.sendRequest(request);
@@ -140,6 +139,7 @@ final class RaftClientImpl implements RaftClient {
handleIOException(request, ioe, null);
}
if (reply != null) {
+ LOG.debug("{}: {}", clientId, reply);
if (reply.isNotLeader()) {
handleNotLeaderException(request, reply.getNotLeaderException());
return null;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
index 3ec0fdc..6e870b1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
@@ -50,6 +50,6 @@ public class RaftGroup {
@Override
public String toString() {
- return groupId + ":" + Arrays.asList(peers);
+ return groupId + ":" + peers;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index d8eb674..5ea8d4f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -59,11 +59,7 @@ public interface CollectionUtils {
.filter(e -> !given.equals(e))
.collect(Collectors.toList());
final int size = list.size();
- if (size == 0) {
- throw new IllegalArgumentException(
- "All elements in the iteration equals to the given element.");
- }
- return list.get(ThreadLocalRandom.current().nextInt(size));
+ return size == 0? null: list.get(ThreadLocalRandom.current().nextInt(size));
}
static <INPUT, OUTPUT> Iterable<OUTPUT> as(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 5da2012..0664aec 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -23,9 +23,11 @@ package org.apache.ratis.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
+import java.util.function.Supplier;
/**
* General Java utility methods.
@@ -60,4 +62,46 @@ public interface JavaUtils {
}
consumer.accept(t);
}
+
+ /**
+ * Create a memoized supplier which gets a value by invoking the initializer once
+ * and then keeps returning the same value as its supplied results.
+ *
+ * @param initializer to supply at most one non-null value.
+ * @param <T> The supplier result type.
+ * @return a memoized supplier which is thread-safe.
+ */
+ static <T> Supplier<T> memoize(Supplier<T> initializer) {
+ Objects.requireNonNull(initializer, "initializer == null");
+ return new Supplier<T>() {
+ private volatile T value = null;
+
+ @Override
+ public T get() {
+ T v = value;
+ if (v == null) {
+ synchronized (this) {
+ v = value;
+ if (v == null) {
+ v = value = Objects.requireNonNull(initializer.get(),
+ "initializer.get() returns null");
+ }
+ }
+ }
+ return v;
+ }
+ };
+ }
+
+ Supplier<ThreadGroup> ROOT_THREAD_GROUP = memoize(() -> {
+ for (ThreadGroup g = Thread.currentThread().getThreadGroup(), p;; g = p) {
+ if ((p = g.getParent()) == null) {
+ return g;
+ }
+ }
+ });
+
+ static ThreadGroup getRootThreadGroup() {
+ return ROOT_THREAD_GROUP.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
index 5246aba..c6ace9e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
@@ -23,6 +23,7 @@ import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -152,11 +153,17 @@ public class LifeCycle {
/** Assert if the current state equals to one of the expected states. */
public void assertCurrentState(State... expected) {
+ assertCurrentState((n, c) -> new IllegalStateException("STATE MISMATCHED: In "
+ + n + ", current state " + c + " is not one of the expected states "
+ + Arrays.toString(expected)), expected);
+ }
+
+ /** Assert if the current state equals to one of the expected states. */
+ public <T extends Throwable> void assertCurrentState(
+ BiFunction<String, State, T> newThrowable, State... expected) throws T {
final State c = getCurrentState();
if (!c.isOneOf(expected)) {
- throw new IllegalStateException("STATE MISMATCHED: In " + name
- + ", current state " + c + " is not one of the expected states "
- + Arrays.toString(expected));
+ throw newThrowable.apply(name, c);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
index 5d01235..e7d2cd0 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -118,6 +118,9 @@ public class AppendStreamer implements Closeable {
leaderId = peers.keySet().iterator().next();
} else {
leaderId = CollectionUtils.random(oldLeader, peers.keySet());
+ if (leaderId == null) {
+ leaderId = oldLeader;
+ }
}
}
LOG.debug("{} switches leader from {} to {}. suggested leader: {}", this,
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index e9784bb..0415aab 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -397,8 +397,14 @@ public class LeaderState {
private void updateLastCommitted() {
final RaftPeerId selfId = server.getId();
final RaftConfiguration conf = server.getRaftConf();
- long majorityInNewConf = computeLastCommitted(voterLists.get(0),
- conf.containsInConf(selfId));
+
+ final List<FollowerInfo> followers = voterLists.get(0);
+ final boolean includeSelf = conf.containsInConf(selfId);
+ if (followers.isEmpty() && !includeSelf) {
+ return;
+ }
+
+ final long majorityInNewConf = computeLastCommitted(followers, includeSelf);
final long oldLastCommitted = raftLog.getLastCommittedIndex();
final TermIndex[] entriesToCommit;
if (!conf.isTransitional()) {
@@ -409,8 +415,13 @@ public class LeaderState {
Math.max(majorityInNewConf, oldLastCommitted) + 1);
server.getState().updateStatemachine(majorityInNewConf, currentTerm);
} else { // configuration is in transitional state
- long majorityInOldConf = computeLastCommitted(voterLists.get(1),
- conf.containsInOldConf(selfId));
+ final List<FollowerInfo> oldFollowers = voterLists.get(1);
+ final boolean includeSelfInOldConf = conf.containsInOldConf(selfId);
+ if (oldFollowers.isEmpty() && !includeSelfInOldConf) {
+ return;
+ }
+
+ final long majorityInOldConf = computeLastCommitted(oldFollowers, includeSelfInOldConf);
final long majority = Math.min(majorityInNewConf, majorityInOldConf);
entriesToCommit = raftLog.getEntries(oldLastCommitted + 1,
Math.max(majority, oldLastCommitted) + 1);
@@ -477,6 +488,10 @@ public class LeaderState {
private long computeLastCommitted(List<FollowerInfo> followers,
boolean includeSelf) {
final int length = includeSelf ? followers.size() + 1 : followers.size();
+ if (length == 0) {
+ throw new IllegalArgumentException("followers.size() == "
+ + followers.size() + " and includeSelf == " + includeSelf);
+ }
final long[] indices = new long[length];
for (int i = 0; i < followers.size(); i++) {
indices[i] = followers.get(i).getMatchIndex();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 0f8e692..1a73cba 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -25,7 +25,6 @@ import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.ratis.shaded.com.google.common.base.Supplier;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
@@ -41,6 +40,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import static org.apache.ratis.server.impl.ServerProtoUtils.toRaftConfiguration;
import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*;
@@ -124,7 +124,7 @@ public class RaftServerImpl implements RaftServerProtocol,
maxTimeoutMs - minTimeoutMs + 1);
}
- RaftGroupId getGroupId() {
+ public RaftGroupId getGroupId() {
return groupId;
}
@@ -318,7 +318,8 @@ public class RaftServerImpl implements RaftServerProtocol,
@Override
public String toString() {
- return role + " " + state + " " + lifeCycle.getCurrentState();
+ return String.format("%8s ", role) + groupId + " " + state
+ + " " + lifeCycle.getCurrentState();
}
/**
@@ -367,14 +368,20 @@ public class RaftServerImpl implements RaftServerProtocol,
peers.toArray(new RaftPeer[peers.size()]));
}
+ private void assertLifeCycleState(LifeCycle.State... expected) throws IOException {
+ lifeCycle.assertCurrentState((n, c) -> new IOException("Server " + n
+ + " is not " + Arrays.asList(expected) + ": current state is " + c),
+ expected);
+ }
+
/**
* Handle a normal update request from client.
*/
private CompletableFuture<RaftClientReply> appendTransaction(
RaftClientRequest request, TransactionContext context,
- RetryCache.CacheEntry cacheEntry) throws RaftException {
+ RetryCache.CacheEntry cacheEntry) throws IOException {
LOG.debug("{}: receive client request({})", getId(), request);
- lifeCycle.assertCurrentState(RUNNING);
+ assertLifeCycleState(RUNNING);
CompletableFuture<RaftClientReply> reply;
final PendingRequest pending;
@@ -486,7 +493,7 @@ public class RaftServerImpl implements RaftServerProtocol,
public CompletableFuture<RaftClientReply> setConfigurationAsync(
SetConfigurationRequest request) throws IOException {
LOG.debug("{}: receive setConfiguration({})", getId(), request);
- lifeCycle.assertCurrentState(RUNNING);
+ assertLifeCycleState(RUNNING);
CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null);
if (reply != null) {
return reply;
@@ -560,7 +567,7 @@ public class RaftServerImpl implements RaftServerProtocol,
candidateId, candidateTerm, candidateLastEntry);
LOG.debug("{}: receive requestVote({}, {}, {})",
getId(), candidateId, candidateTerm, candidateLastEntry);
- lifeCycle.assertCurrentState(RUNNING);
+ assertLifeCycleState(RUNNING);
boolean voteGranted = false;
boolean shouldShutdown = false;
@@ -661,7 +668,7 @@ public class RaftServerImpl implements RaftServerProtocol,
+ leaderTerm + ", " + previous + ", " + leaderCommit + ", "
+ initializing + ServerProtoUtils.toString(entries));
- lifeCycle.assertCurrentState(STARTING, RUNNING);
+ assertLifeCycleState(STARTING, RUNNING);
try {
validateEntries(leaderTerm, previous, entries);
@@ -760,7 +767,7 @@ public class RaftServerImpl implements RaftServerProtocol,
leaderId, request);
LOG.debug("{}: receive installSnapshot({})", getId(), request);
- lifeCycle.assertCurrentState(STARTING, RUNNING);
+ assertLifeCycleState(STARTING, RUNNING);
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 53363cd..5ab1517 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -30,7 +30,6 @@ import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
-import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import java.io.Closeable;
@@ -85,7 +84,7 @@ public class ServerState implements Closeable {
RaftConfiguration initialConf = RaftConfiguration.newBuilder()
.setConf(group.getPeers()).build();
configurationManager = new ConfigurationManager(initialConf);
- storage = new RaftStorage(prop, RaftServerConstants.StartupOption.REGULAR);
+ storage = new RaftStorage(prop, group.getGroupId(), RaftServerConstants.StartupOption.REGULAR);
snapshotManager = new SnapshotManager(storage, id);
long lastApplied = initStatemachine(stateMachine, prop);
@@ -231,10 +230,7 @@ public class ServerState implements Closeable {
// leader and term later
return true;
}
- Preconditions.assertTrue(this.leaderId.equals(leaderId),
- "selfId:%s, this.leaderId:%s, received leaderId:%s",
- selfId, this.leaderId, leaderId);
- return true;
+ return this.leaderId.equals(leaderId);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
index db8a196..0b38a31 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
@@ -18,6 +18,7 @@
package org.apache.ratis.server.storage;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState;
@@ -44,9 +45,17 @@ public class RaftStorage implements Closeable {
public RaftStorage(RaftProperties prop, RaftServerConstants.StartupOption option)
throws IOException {
- final String dir = RaftServerConfigKeys.storageDir(prop);
- storageDir = new RaftStorageDirectory(
- new File(FileUtils.stringAsURI(dir).getPath()));
+ this(prop, null, option);
+ }
+
+ public RaftStorage(RaftProperties prop, RaftGroupId groupId, RaftServerConstants.StartupOption option)
+ throws IOException {
+ final String dirStr = RaftServerConfigKeys.storageDir(prop);
+ File dir = new File(FileUtils.stringAsURI(dirStr).getPath());
+ if (groupId != null) {
+ dir = new File(dir, groupId.toString());
+ }
+ storageDir = new RaftStorageDirectory(dir);
if (option == RaftServerConstants.StartupOption.FORMAT) {
if (storageDir.analyzeStorage(false) == StorageState.NON_EXISTENT) {
throw new IOException("Cannot format " + storageDir);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 577ae16..becbd0d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -100,11 +100,11 @@ public abstract class MiniRaftCluster {
}
public static RaftGroup initRaftGroup(Collection<String> ids) {
- List<RaftPeer> peers = ids.stream()
+ final RaftPeer[] peers = ids.stream()
.map(RaftPeerId::valueOf)
.map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
- .collect(Collectors.toList());
- return new RaftGroup(RaftGroupId.createId(), peers.toArray(new RaftPeer[peers.size()]));
+ .toArray(RaftPeer[]::new);
+ return new RaftGroup(RaftGroupId.createId(), peers);
}
private static String getBaseDirectory() {
@@ -227,7 +227,7 @@ public abstract class MiniRaftCluster {
return ReflectionUtils.newInstance(smClass);
}
- public static Collection<RaftPeer> toRaftPeers(
+ public static List<RaftPeer> toRaftPeers(
Collection<RaftServerProxy> servers) {
return servers.stream()
.map(MiniRaftCluster::toRaftPeer)
@@ -308,11 +308,28 @@ public abstract class MiniRaftCluster {
}
public String printServers() {
- StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n");
- for (RaftServer s : servers.values()) {
- b.append(" ");
- b.append(s).append("\n");
+ return printServers(null);
+ }
+
+ public String printServers(RaftGroupId groupId) {
+ final StringBuilder b = new StringBuilder("printing ");
+ if (groupId != null) {
+ b.append(groupId);
+ } else {
+ b.append("ALL groups");
}
+ getServers().stream().filter(
+ s -> {
+ if (groupId == null) {
+ return true;
+ }
+ try {
+ return groupId.equals(s.getImpl().getGroupId());
+ } catch (IOException e) {
+ return false;
+ }
+ })
+ .forEach(s -> b.append("\n ").append(s));
return b.toString();
}
@@ -332,10 +349,20 @@ public abstract class MiniRaftCluster {
}
public RaftServerImpl getLeader() {
+ return getLeader((RaftGroupId)null);
+ }
+
+ public RaftServerImpl getLeader(RaftGroupId groupId) {
+ Stream<RaftServerImpl> stream = getServerAliveStream();
+ if (groupId != null) {
+ stream = stream.filter(s -> groupId.equals(s.getGroupId()));
+ }
+ return getLeader(stream);
+ }
+
+ static RaftServerImpl getLeader(Stream<RaftServerImpl> serverAliveStream) {
final List<RaftServerImpl> leaders = new ArrayList<>();
- getServerAliveStream()
- .filter(RaftServerImpl::isLeader)
- .forEach(s -> {
+ serverAliveStream.filter(RaftServerImpl::isLeader).forEach(s -> {
if (leaders.isEmpty()) {
leaders.add(s);
} else {
@@ -352,7 +379,7 @@ public abstract class MiniRaftCluster {
if (leaders.isEmpty()) {
return null;
} else if (leaders.size() > 1) {
- throw new IllegalStateException(printServers() + leaders
+ throw new IllegalStateException(leaders
+ ", leaders.size() = " + leaders.size() + " > 1");
}
return leaders.get(0);
@@ -388,7 +415,7 @@ public abstract class MiniRaftCluster {
return servers.get(id);
}
- public Collection<RaftPeer> getPeers() {
+ public List<RaftPeer> getPeers() {
return toRaftPeers(getServers());
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 9086289..ec39073 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -18,6 +18,7 @@
package org.apache.ratis;
import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -64,22 +65,27 @@ public class RaftTestUtil {
}
public static RaftServerImpl waitForLeader(
- MiniRaftCluster cluster, boolean tolerateMultipleLeaders)
+ MiniRaftCluster cluster, boolean tolerateMultipleLeaders) throws InterruptedException {
+ return waitForLeader(cluster, tolerateMultipleLeaders, null);
+ }
+
+ public static RaftServerImpl waitForLeader(
+ MiniRaftCluster cluster, boolean tolerateMultipleLeaders, RaftGroupId groupId)
throws InterruptedException {
final long sleepTime = (cluster.getMaxTimeout() * 3) >> 1;
- LOG.info(cluster.printServers());
+ LOG.info(cluster.printServers(groupId));
RaftServerImpl leader = null;
for(int i = 0; leader == null && i < 10; i++) {
Thread.sleep(sleepTime);
try {
- leader = cluster.getLeader();
+ leader = cluster.getLeader(groupId);
} catch(IllegalStateException e) {
if (!tolerateMultipleLeaders) {
throw e;
}
}
}
- LOG.info(cluster.printServers());
+ LOG.info(cluster.printServers(groupId));
return leader;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/fb702de7/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
index 22e0564..20b8680 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
@@ -21,11 +21,13 @@ import org.apache.log4j.Level;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.ConfUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -34,14 +36,18 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public abstract class ReinitializationBaseTest {
static {
LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
- LogUtils.setLogLevel(RaftClient.LOG, Level.TRACE);
+ LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+ LogUtils.setLogLevel(ConfUtils.LOG, Level.OFF);
}
static final Logger LOG = LoggerFactory.getLogger(ReinitializationBaseTest.class);
@@ -59,7 +65,8 @@ public abstract class ReinitializationBaseTest {
LOG.info("Start testReinitialize" + cluster.printServers());
// Start server with an empty conf
- RaftGroup group = new RaftGroup(RaftGroupId.createId(), new RaftPeer[0]);
+ final RaftGroupId groupId = RaftGroupId.createId();
+ final RaftGroup group = new RaftGroup(groupId, RaftPeer.EMPTY_PEERS);
final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(3, 0))
.map(RaftPeerId::valueOf).collect(Collectors.toList());
@@ -77,9 +84,121 @@ public abstract class ReinitializationBaseTest {
final RaftPeer[] peers = cluster.getPeers().toArray(RaftPeer.EMPTY_PEERS);
for(RaftPeer p : peers) {
final RaftClient client = cluster.createClient(p.getId(),
- new RaftGroup(RaftGroupId.createId(), new RaftPeer[]{p}));
+ new RaftGroup(groupId, new RaftPeer[]{p}));
client.reinitialize(peers, p.getId());
}
Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true));
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testReinitialize5Nodes() throws Exception {
+ final int[] idIndex = {3, 4, 5};
+ runTestReinitializeMultiGroups(idIndex, 0);
+ }
+
+ @Test
+ public void testReinitialize7Nodes() throws Exception {
+ final int[] idIndex = {1, 6, 7};
+ runTestReinitializeMultiGroups(idIndex, 1);
+ }
+
+ @Test
+ public void testReinitialize9Nodes() throws Exception {
+ final int[] idIndex = {5, 8, 9};
+ runTestReinitializeMultiGroups(idIndex, 0);
+ }
+
+ private void runTestReinitializeMultiGroups(int[] idIndex, int chosen) throws Exception {
+ printThreadCount(null, "init");
+ final MiniRaftCluster cluster = getCluster(0);
+
+ if (chosen < 0) {
+ chosen = ThreadLocalRandom.current().nextInt(idIndex.length);
+ }
+ final String type = cluster.getClass().getSimpleName()
+ + Arrays.toString(idIndex) + "chosen=" + chosen;
+ LOG.info("\n\nrunTestReinitializeMultiGroups with " + type + ": " + cluster.printServers());
+
+ // Start server with an empty conf
+ final RaftGroup emptyGroup = new RaftGroup(RaftGroupId.createId(), RaftPeer.EMPTY_PEERS);
+
+ final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(idIndex[idIndex.length - 1], 0))
+ .map(RaftPeerId::valueOf).collect(Collectors.toList());
+ ids.forEach(id -> cluster.putNewServer(id, emptyGroup, true));
+ LOG.info("putNewServer: " + cluster.printServers());
+
+ cluster.start();
+ LOG.info("start: " + cluster.printServers());
+
+ // Make sure that there are no leaders.
+ TimeUnit.SECONDS.sleep(1);
+ Assert.assertNull(cluster.getLeader());
+
+ // Reinitialize servers to three groups
+ final List<RaftPeer> allPeers = cluster.getPeers();
+ Collections.sort(allPeers, Comparator.comparing(p -> p.getId().toString()));
+ final RaftGroup[] groups = new RaftGroup[idIndex.length];
+ for (int i = 0; i < idIndex.length; i++) {
+ final RaftGroupId gid = RaftGroupId.createId();
+ final int previous = i == 0 ? 0 : idIndex[i - 1];
+ final RaftPeer[] peers = allPeers.subList(previous, idIndex[i]).toArray(RaftPeer.EMPTY_PEERS);
+ groups[i] = new RaftGroup(gid, peers);
+
+ LOG.info(i + ") starting " + groups[i]);
+ for(RaftPeer p : peers) {
+ try(final RaftClient client = cluster.createClient(p.getId(), groups[i])) {
+ client.reinitialize(peers, p.getId());
+ }
+ }
+ Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true, gid));
+ }
+ printThreadCount(type, "start groups");
+ LOG.info("start groups: " + cluster.printServers());
+
+ // randomly close two of the groups (i.e. reinitialize to empty peers)
+ LOG.info("chosen = " + chosen + ", " + groups[chosen]);
+
+ for (int i = 0; i < groups.length; i++) {
+ if (i != chosen) {
+ final RaftGroup g = groups[i];
+ LOG.info(i + ") close " + cluster.printServers(g.getGroupId()));
+ for(RaftPeer p : g.getPeers()) {
+ try (final RaftClient client = cluster.createClient(p.getId(), g)) {
+ client.reinitialize(RaftPeer.EMPTY_PEERS, p.getId());
+ }
+ }
+ }
+ }
+ printThreadCount(type, "close groups");
+ LOG.info("close groups: " + cluster.printServers());
+
+ // update chosen group to use all the peers
+ final RaftPeer[] array = allPeers.toArray(RaftPeer.EMPTY_PEERS);
+ for(int i = 0; i < groups.length; i++) {
+ LOG.info(i + ") update " + cluster.printServers(groups[i].getGroupId()));
+ if (i == chosen) {
+ try (final RaftClient client = cluster.createClient(null, groups[i])) {
+ client.setConfiguration(array);
+ }
+ } else {
+ for(RaftPeer p : groups[i].getPeers()) {
+ try (final RaftClient client = cluster.createClient(p.getId(), groups[i])) {
+ client.reinitialize(array, p.getId());
+ }
+ }
+ }
+ }
+ Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true));
+ LOG.info("update groups: " + cluster.printServers());
+ printThreadCount(type, "update groups");
+
+ cluster.shutdown();
+ printThreadCount(type, "shutdown");
+ }
+
+ static void printThreadCount(String type, String label) {
+ System.out.println("| " + type + " | " + label + " | "
+ + JavaUtils.getRootThreadGroup().activeCount() + " |");
}
}