You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/04/10 11:11:17 UTC
incubator-ratis git commit: RATIS-208. Allow client to specify
replication level in a request. Contributed by Kit Hui
Repository: incubator-ratis
Updated Branches:
refs/heads/master 00f64b4c1 -> c692bf201
RATIS-208. Allow client to specify replication level in a request. Contributed by Kit Hui
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c692bf20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c692bf20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c692bf20
Branch: refs/heads/master
Commit: c692bf201d01a67e2c85efcd625e0bf80c96758b
Parents: 00f64b4
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Tue Apr 10 19:10:30 2018 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Tue Apr 10 19:10:30 2018 +0800
----------------------------------------------------------------------
.../org/apache/ratis/client/RaftClient.java | 23 ++++++-
.../ratis/client/impl/RaftClientImpl.java | 9 +--
.../ratis/protocol/RaftClientRequest.java | 23 ++++---
ratis-proto-shaded/src/main/proto/Raft.proto | 6 ++
.../apache/ratis/server/impl/LeaderState.java | 49 ++++++++------
.../ratis/server/impl/PendingRequest.java | 18 +++++-
.../ratis/server/impl/PendingRequests.java | 67 +++++++++++++++++++-
.../java/org/apache/ratis/MiniRaftCluster.java | 6 +-
.../java/org/apache/ratis/RaftAsyncTests.java | 18 +++++-
.../java/org/apache/ratis/RaftBasicTests.java | 39 ++++++++++--
10 files changed, 214 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 84fec9e..5562f59 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -23,6 +23,7 @@ import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,8 +46,17 @@ public interface RaftClient extends Closeable {
* Async call to send the given message to the raft service.
* The message may change the state of the service.
* For readonly messages, use {@link #sendReadOnlyAsync(Message)} instead.
+ *
+ * @param message The request message.
+ * @param replication The replication level required.
+ * @return a future of the reply.
*/
- CompletableFuture<RaftClientReply> sendAsync(Message message);
+ CompletableFuture<RaftClientReply> sendAsync(Message message, ReplicationLevel replication);
+
+ /** The same as sendAsync(message, MAJORITY). */
+ default CompletableFuture<RaftClientReply> sendAsync(Message message) {
+ return sendAsync(message, ReplicationLevel.MAJORITY);
+ }
/** Async call to send the given readonly message to the raft service. */
CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message);
@@ -58,8 +68,17 @@ public interface RaftClient extends Closeable {
* Send the given message to the raft service.
* The message may change the state of the service.
* For readonly messages, use {@link #sendReadOnly(Message)} instead.
+ *
+ * @param message The request message.
+ * @param replication The replication level required.
+ * @return the reply.
*/
- RaftClientReply send(Message message) throws IOException;
+ RaftClientReply send(Message message, ReplicationLevel replication) throws IOException;
+
+ /** The same as send(message, MAJORITY). */
+ default RaftClientReply send(Message message) throws IOException {
+ return send(message, ReplicationLevel.MAJORITY);
+ }
/** Send the given readonly message to the raft service. */
RaftClientReply sendReadOnly(Message message) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index f0abb10..e8a897b 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -22,6 +22,7 @@ import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.util.*;
import java.io.IOException;
@@ -129,8 +130,8 @@ final class RaftClientImpl implements RaftClient {
}
@Override
- public CompletableFuture<RaftClientReply> sendAsync(Message message) {
- return sendAsync(RaftClientRequest.writeRequestType(), message, null);
+ public CompletableFuture<RaftClientReply> sendAsync(Message message, ReplicationLevel replication) {
+ return sendAsync(RaftClientRequest.writeRequestType(replication), message, null);
}
@Override
@@ -168,8 +169,8 @@ final class RaftClientImpl implements RaftClient {
}
@Override
- public RaftClientReply send(Message message) throws IOException {
- return send(RaftClientRequest.writeRequestType(), message, null);
+ public RaftClientReply send(Message message, ReplicationLevel replication) throws IOException {
+ return send(RaftClientRequest.writeRequestType(replication), message, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 072a854..232c51d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -17,10 +17,7 @@
*/
package org.apache.ratis.protocol;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.ReadRequestTypeProto;
-import org.apache.ratis.shaded.proto.RaftProtos.StaleReadRequestTypeProto;
-import org.apache.ratis.shaded.proto.RaftProtos.WriteRequestTypeProto;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.util.Preconditions;
import java.util.Objects;
@@ -31,12 +28,20 @@ import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Ty
* Request from client to server
*/
public class RaftClientRequest extends RaftClientMessage {
- private static final Type DEFAULT_WRITE = new Type(WriteRequestTypeProto.getDefaultInstance());
+ private static final Type WRITE_DEFAULT = new Type(WriteRequestTypeProto.getDefaultInstance());
+ private static final Type WRITE_ALL = new Type(
+ WriteRequestTypeProto.newBuilder().setReplication(ReplicationLevel.ALL).build());
+
private static final Type DEFAULT_READ = new Type(ReadRequestTypeProto.getDefaultInstance());
private static final Type DEFAULT_STALE_READ = new Type(StaleReadRequestTypeProto.getDefaultInstance());
- public static Type writeRequestType() {
- return DEFAULT_WRITE;
+ public static Type writeRequestType(ReplicationLevel replication) {
+ switch (replication) {
+ case MAJORITY: return WRITE_DEFAULT;
+ case ALL: return WRITE_ALL;
+ default:
+ throw new IllegalArgumentException("Unexpected replication: " + replication);
+ }
}
public static Type readRequestType() {
@@ -51,7 +56,7 @@ public class RaftClientRequest extends RaftClientMessage {
/** The type of a request (oneof write, read, staleRead; see the message RaftClientRequestProto). */
public static class Type {
public static Type valueOf(WriteRequestTypeProto write) {
- return DEFAULT_WRITE;
+ return writeRequestType(write.getReplication());
}
public static Type valueOf(ReadRequestTypeProto read) {
@@ -136,7 +141,7 @@ public class RaftClientRequest extends RaftClientMessage {
public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
RaftGroupId groupId, long callId) {
- this(clientId, serverId, groupId, callId, 0L, null, writeRequestType());
+ this(clientId, serverId, groupId, callId, 0L, null, WRITE_DEFAULT);
}
public RaftClientRequest(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index 0fa845b..3f0baf8 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -170,7 +170,13 @@ message ClientMessageEntryProto {
bytes content = 1;
}
+enum ReplicationLevel {
+ MAJORITY = 0;
+ ALL = 1;
+}
+
message WriteRequestTypeProto {
+ ReplicationLevel replication = 1;
}
message ReadRequestTypeProto {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index ca27b7e..309ebf5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -448,16 +448,14 @@ public class LeaderState {
return;
}
- final long majorityInNewConf = computeLastCommitted(followers, includeSelf);
- final long oldLastCommitted = raftLog.getLastCommittedIndex();
- final TermIndex[] entriesToCommit;
+ final long[] indicesInNewConf = computeCommittedIndices(followers, includeSelf);
+ final long majorityInNewConf = getMajority(indicesInNewConf);
+ final long majority;
+ final long min;
+
if (!conf.isTransitional()) {
- // copy the entries that may get committed out of the raftlog, to prevent
- // the possible race that the log gets purged after the statemachine does
- // a snapshot
- entriesToCommit = raftLog.getEntries(oldLastCommitted + 1,
- Math.max(majorityInNewConf, oldLastCommitted) + 1);
- server.getState().updateStatemachine(majorityInNewConf, currentTerm);
+ majority = majorityInNewConf;
+ min = indicesInNewConf[0];
} else { // configuration is in transitional state
final List<FollowerInfo> oldFollowers = voterLists.get(1);
final boolean includeSelfInOldConf = conf.containsInOldConf(selfId);
@@ -465,13 +463,23 @@ public class LeaderState {
return;
}
- final long majorityInOldConf = computeLastCommitted(oldFollowers, includeSelfInOldConf);
- final long majority = Math.min(majorityInNewConf, majorityInOldConf);
- entriesToCommit = raftLog.getEntries(oldLastCommitted + 1,
- Math.max(majority, oldLastCommitted) + 1);
+ final long[] indicesInOldConf = computeCommittedIndices(oldFollowers, includeSelfInOldConf);
+ final long majorityInOldConf = getMajority(indicesInOldConf);
+ majority = Math.min(majorityInNewConf, majorityInOldConf);
+ min = Math.min(indicesInNewConf[0], indicesInOldConf[0]);
+ }
+
+ final long oldLastCommitted = raftLog.getLastCommittedIndex();
+ if (majority > oldLastCommitted) {
+ // copy the entries out from the raftlog, in order to prevent that
+ // the log gets purged after the statemachine does a snapshot
+ final TermIndex[] entriesToCommit = raftLog.getEntries(
+ oldLastCommitted + 1, majority + 1);
server.getState().updateStatemachine(majority, currentTerm);
+ checkAndUpdateConfiguration(entriesToCommit);
}
- checkAndUpdateConfiguration(entriesToCommit);
+
+ pendingRequests.checkDelayedReplies(min);
}
private boolean committedConf(TermIndex[] entries) {
@@ -529,8 +537,11 @@ public class LeaderState {
notifySenders();
}
- private long computeLastCommitted(List<FollowerInfo> followers,
- boolean includeSelf) {
+ static long getMajority(long[] indices) {
+ return indices[(indices.length - 1) / 2];
+ }
+
+ private long[] computeCommittedIndices(List<FollowerInfo> followers, boolean includeSelf) {
final int length = includeSelf ? followers.size() + 1 : followers.size();
if (length == 0) {
throw new IllegalArgumentException("followers.size() == "
@@ -546,7 +557,7 @@ public class LeaderState {
}
Arrays.sort(indices);
- return indices[(indices.length - 1) / 2];
+ return indices;
}
private List<List<FollowerInfo>> divideFollowers(RaftConfiguration conf) {
@@ -567,7 +578,9 @@ public class LeaderState {
}
void replyPendingRequest(long logIndex, RaftClientReply reply) {
- pendingRequests.replyPendingRequest(logIndex, reply);
+ if (!pendingRequests.replyPendingRequest(logIndex, reply)) {
+ submitUpdateStateEvent(UPDATE_COMMIT_EVENT);
+ }
}
TransactionContext getTransactionContext(long index) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
index b63cd01..10cd95f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -17,10 +17,14 @@
*/
package org.apache.ratis.server.impl;
-import org.apache.ratis.protocol.*;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.Preconditions;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
public class PendingRequest implements Comparable<PendingRequest> {
@@ -29,6 +33,8 @@ public class PendingRequest implements Comparable<PendingRequest> {
private final TransactionContext entry;
private final CompletableFuture<RaftClientReply> future;
+ private volatile RaftClientReply delayed;
+
PendingRequest(long index, RaftClientRequest request,
TransactionContext entry) {
this.index = index;
@@ -70,6 +76,16 @@ public class PendingRequest implements Comparable<PendingRequest> {
future.complete(r);
}
+ synchronized void setDelayedReply(RaftClientReply r) {
+ Objects.requireNonNull(r);
+ Preconditions.assertTrue(delayed == null);
+ delayed = r;
+ }
+
+ synchronized void completeDelayedReply() {
+ setReply(delayed);
+ }
+
TransactionContext setNotLeaderException(NotLeaderException nle) {
setReply(new RaftClientReply(getRequest(), nle, null));
return getEntry();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index b418658..c02d7c3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -18,16 +18,20 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
+import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
class PendingRequests {
@@ -71,14 +75,63 @@ class PendingRequests {
}
}
+ private static class DelayedReplies {
+ private final String name;
+ private final PriorityQueue<PendingRequest> q = new PriorityQueue<>();
+ private AtomicLong allAckedIndex = new AtomicLong();
+
+ private DelayedReplies(Object name) {
+ this.name = name + "-" + getClass().getSimpleName();
+ }
+
+ boolean delay(PendingRequest request, RaftClientReply reply) {
+ if (request.getIndex() <= allAckedIndex.get()) {
+ return false; // delay is not required.
+ }
+
+ LOG.debug("{}: delay request {}", name, request);
+ request.setDelayedReply(reply);
+ final boolean offered;
+ synchronized (q) {
+ offered = q.offer(request);
+ }
+ Preconditions.assertTrue(offered);
+ return true;
+ }
+
+ void update(final long allAcked) {
+ final long old = allAckedIndex.getAndUpdate(n -> allAcked > n? allAcked : n);
+ if (allAcked <= old) {
+ return;
+ }
+
+ LOG.debug("{}: update allAckedIndex {} -> {}", name, old, allAcked);
+ for(;;) {
+ final PendingRequest polled;
+ synchronized (q) {
+ final PendingRequest peeked = q.peek();
+ if (peeked == null || peeked.getIndex() > allAcked) {
+ return;
+ }
+ polled = q.poll();
+ Preconditions.assertTrue(polled == peeked);
+ }
+ polled.completeDelayedReply();
+ }
+ }
+ }
+
private PendingRequest pendingSetConf;
private final RaftServerImpl server;
private final RequestMap pendingRequests;
private PendingRequest last = null;
+ private final DelayedReplies delayedReplies;
+
PendingRequests(RaftServerImpl server) {
this.server = server;
this.pendingRequests = new RequestMap(server.getId());
+ this.delayedReplies = new DelayedReplies(server.getId());
}
PendingRequest addPendingRequest(long index, RaftClientRequest request,
@@ -132,12 +185,20 @@ class PendingRequests {
return pendingRequest != null ? pendingRequest.getEntry() : null;
}
- void replyPendingRequest(long index, RaftClientReply reply) {
+ boolean replyPendingRequest(long index, RaftClientReply reply) {
final PendingRequest pending = pendingRequests.remove(index);
if (pending != null) {
Preconditions.assertTrue(pending.getIndex() == index);
+
+ final ReplicationLevel replication = pending.getRequest().getType().getWrite().getReplication();
+ if (replication == ReplicationLevel.ALL) {
+ if (delayedReplies.delay(pending, reply)) {
+ return false;
+ }
+ }
pending.setReply(reply);
}
+ return true;
}
/**
@@ -155,4 +216,8 @@ class PendingRequests {
pendingSetConf.setNotLeaderException(nle);
}
}
+
+ void checkDelayedReplies(long allAckedIndex) {
+ delayedReplies.update(allAckedIndex);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 91a1600..74238e8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -28,6 +28,7 @@ import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.storage.MemoryRaftLog;
import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.*;
@@ -450,6 +451,9 @@ public abstract class MiniRaftCluster {
public static Stream<RaftServerImpl> getServerStream(Collection<RaftServerProxy> servers) {
return servers.stream().map(RaftTestUtil::getImplAsUnchecked);
}
+ public Stream<RaftServerImpl> getServerStream() {
+ return getServerStream(getServers());
+ }
public Stream<RaftServerImpl> getServerAliveStream() {
return getServerStream(getServers()).filter(RaftServerImpl::isAlive);
}
@@ -504,7 +508,7 @@ public abstract class MiniRaftCluster {
public RaftClientRequest newRaftClientRequest(
ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, Message message) {
return new RaftClientRequest(clientId, leaderId, getGroupId(),
- callId, seqNum, message, RaftClientRequest.writeRequestType());
+ callId, seqNum, message, RaftClientRequest.writeRequestType(ReplicationLevel.MAJORITY));
}
public SetConfigurationRequest newSetConfigurationRequest(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 3c68469..438d56a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -30,12 +30,15 @@ import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.TimeDuration;
-import org.junit.*;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
@@ -157,7 +160,18 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
cluster.start();
waitForLeader(cluster);
- RaftBasicTests.runTestBasicAppendEntries(true, 1000, cluster, LOG);
+ RaftBasicTests.runTestBasicAppendEntries(true, ReplicationLevel.MAJORITY, 1000, cluster, LOG);
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testBasicAppendEntriesAsyncWithAllReplication() throws Exception {
+ LOG.info("Running testBasicAppendEntriesAsync");
+ RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, 100);
+ final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+ cluster.start();
+ waitForLeader(cluster);
+ RaftBasicTests.runTestBasicAppendEntries(true, ReplicationLevel.ALL, 1000, cluster, LOG);
cluster.shutdown();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c692bf20/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index b0980f4..4deeef5 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -24,12 +24,14 @@ import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RetryCacheTestUtil;
import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.TimeDuration;
@@ -90,16 +92,39 @@ public abstract class RaftBasicTests extends BaseTest {
@Test
public void testBasicAppendEntries() throws Exception {
- runTestBasicAppendEntries(false, 10, getCluster(), LOG);
+ runTestBasicAppendEntries(false, ReplicationLevel.MAJORITY, 10, getCluster(), LOG);
+ }
+
+ @Test
+ public void testBasicAppendEntriesWithAllReplication() throws Exception {
+ runTestBasicAppendEntries(false, ReplicationLevel.ALL, 10, getCluster(), LOG);
}
static void runTestBasicAppendEntries(
- boolean async, int numMessages, MiniRaftCluster cluster, Logger LOG) throws Exception {
+ boolean async, ReplicationLevel replication, int numMessages, MiniRaftCluster cluster, Logger LOG) throws Exception {
LOG.info("runTestBasicAppendEntries: async? " + async + ", numMessages=" + numMessages);
+ for (RaftServer s : cluster.getServers()) {
+ cluster.restartServer(s.getId(), false);
+ }
RaftServerImpl leader = waitForLeader(cluster);
final long term = leader.getState().getCurrentTerm();
+
final RaftPeerId killed = cluster.getFollowers().get(0).getId();
cluster.killServer(killed);
+
+ if (replication == ReplicationLevel.ALL) {
+ new Thread(() -> {
+ try {
+ Thread.sleep(3000);
+ LOG.info("restart server: " + killed.toString());
+ cluster.restartServer(killed, false);
+ } catch (Exception e) {
+ LOG.info("cannot restart server: " + killed.toString());
+ e.printStackTrace();
+ }
+ }).start();
+ }
+
LOG.info(cluster.printServers());
final SimpleMessage[] messages = SimpleMessage.create(numMessages);
@@ -110,7 +135,7 @@ public abstract class RaftBasicTests extends BaseTest {
for (SimpleMessage message : messages) {
if (async) {
- client.sendAsync(message).thenAcceptAsync(reply -> {
+ client.sendAsync(message, replication).thenAcceptAsync(reply -> {
if (!reply.isSuccess()) {
f.completeExceptionally(
new AssertionError("Failed with reply " + reply));
@@ -119,7 +144,7 @@ public abstract class RaftBasicTests extends BaseTest {
}
});
} else {
- client.send(message);
+ client.send(message, replication);
}
}
if (async) {
@@ -127,14 +152,16 @@ public abstract class RaftBasicTests extends BaseTest {
Assert.assertEquals(messages.length, asyncReplyCount.get());
}
}
-
- Thread.sleep(cluster.getMaxTimeout() + 100);
+ if (replication != ReplicationLevel.ALL) {
+ Thread.sleep(cluster.getMaxTimeout() + 100);
+ }
LOG.info(cluster.printAllLogs());
cluster.getServerAliveStream().map(s -> s.getState().getLog())
.forEach(log -> RaftTestUtil.assertLogEntries(log, async, term, messages));
}
+
@Test
public void testOldLeaderCommit() throws Exception {
LOG.info("Running testOldLeaderCommit");