You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ms...@apache.org on 2018/08/11 12:21:47 UTC
incubator-ratis git commit: RATIS-270. Replication ALL requests
should not be replied from retry cache if they are delayed. Contributed by
Tsz Wo Nicholas Sze.
Repository: incubator-ratis
Updated Branches:
refs/heads/master e4a016fb0 -> 86db875aa
RATIS-270. Replication ALL requests should not be replied from retry cache if they are delayed. Contributed by Tsz Wo Nicholas Sze.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/86db875a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/86db875a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/86db875a
Branch: refs/heads/master
Commit: 86db875aa622fb5432c6048894eafc54c236a7c1
Parents: e4a016f
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Sat Aug 11 17:50:37 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Sat Aug 11 17:50:37 2018 +0530
----------------------------------------------------------------------
.../ratis/grpc/TestRetryCacheWithGrpc.java | 71 +++++++++++++++++++-
.../apache/ratis/server/impl/LeaderState.java | 7 +-
.../ratis/server/impl/PendingRequest.java | 39 ++++++++---
.../ratis/server/impl/PendingRequests.java | 11 +--
.../ratis/server/impl/RaftServerImpl.java | 11 ++-
.../java/org/apache/ratis/MiniRaftCluster.java | 13 +++-
.../java/org/apache/ratis/RetryCacheTests.java | 38 +++++------
7 files changed, 149 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
index 956fd66..f577a48 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
@@ -18,12 +18,26 @@
package org.apache.ratis.grpc;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.log4j.Level;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.RetryCacheTests;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.shaded.proto.RaftProtos;
import org.apache.ratis.util.LogUtils;
import org.junit.Assert;
+import org.junit.Test;
public class TestRetryCacheWithGrpc extends RetryCacheTests {
static {
@@ -43,4 +57,59 @@ public class TestRetryCacheWithGrpc extends RetryCacheTests {
return cluster;
}
-}
+ @Test
+ public void testAsyncRetryWithReplicatedAll() throws Exception {
+ final MiniRaftCluster cluster = getCluster();
+ RaftTestUtil.waitForLeader(cluster);
+
+ final RaftPeerId leaderId = cluster.getLeaderAndSendFirstMessage().getId();
+ long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
+
+ // Kill a follower
+ final RaftPeerId killedFollower = cluster.getFollowers().get(0).getId();
+ cluster.killServer(killedFollower);
+
+ final long callId = 999;
+ final long seqNum = 111;
+ final ClientId clientId = ClientId.randomId();
+
+ // Retry with the same clientId and callId
+ final List<CompletableFuture<RaftClient>> futures = new ArrayList<>();
+ futures.addAll(sendRetry(clientId, leaderId, callId, seqNum, cluster));
+ futures.addAll(sendRetry(clientId, leaderId, callId, seqNum, cluster));
+
+ // restart the killed follower
+ cluster.restartServer(killedFollower, false);
+ for(CompletableFuture<RaftClient> f : futures) {
+ f.join().close();
+ }
+ assertServer(cluster, clientId, callId, oldLastApplied);
+ }
+
+ List<CompletableFuture<RaftClient>> sendRetry(
+ ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, MiniRaftCluster cluster)
+ throws Exception {
+ List<CompletableFuture<RaftClient>> futures = new ArrayList<>();
+ final int numRequest = 3;
+ for (int i = 0; i < numRequest; i++) {
+ final RaftClient client = cluster.createClient(leaderId, cluster.getGroup(), clientId);
+ final RaftClientRpc rpc = client.getClientRpc();
+ final RaftClientRequest request = cluster.newRaftClientRequest(client.getId(), leaderId,
+ callId, seqNum, new RaftTestUtil.SimpleMessage("message"), RaftProtos.ReplicationLevel.ALL);
+
+ LOG.info("{} sendRequestAsync {}", i, request);
+ futures.add(rpc.sendRequestAsync(request)
+ .thenApply(reply -> assertReply(reply, client, callId)));
+ }
+
+ for(CompletableFuture<RaftClient> f : futures) {
+ try {
+ f.get(200, TimeUnit.MILLISECONDS);
+ Assert.fail("It should timeout for ReplicationLevel.ALL since a follower is down");
+ } catch(TimeoutException te) {
+ LOG.info("Expected " + te);
+ }
+ }
+ return futures;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 32b787f..a13284f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -573,10 +573,13 @@ public class LeaderState {
return lists;
}
- void replyPendingRequest(long logIndex, RaftClientReply reply) {
- if (!pendingRequests.replyPendingRequest(logIndex, reply)) {
+ /** @return true if the request is replied; otherwise, the reply is delayed, return false. */
+ boolean replyPendingRequest(long logIndex, RaftClientReply reply, RetryCache.CacheEntry cacheEntry) {
+ if (!pendingRequests.replyPendingRequest(logIndex, reply, cacheEntry)) {
submitUpdateStateEvent(UPDATE_COMMIT_EVENT);
+ return false;
}
+ return true;
}
TransactionContext getTransactionContext(long index) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
index 4a72197..cdb283f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -18,6 +18,7 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.impl.RetryCache.CacheEntry;
import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.Preconditions;
@@ -26,15 +27,35 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
public class PendingRequest implements Comparable<PendingRequest> {
+ private static class DelayedReply {
+ private final RaftClientReply reply;
+ private final CacheEntry cacheEntry;
+
+ DelayedReply(RaftClientReply reply, CacheEntry cacheEntry) {
+ this.reply = reply;
+ this.cacheEntry = cacheEntry;
+ }
+
+ RaftClientReply getReply() {
+ cacheEntry.updateResult(reply);
+ return reply;
+ }
+
+ RaftClientReply fail(NotReplicatedException e) {
+ final RaftClientReply failed = new RaftClientReply(reply, e);
+ cacheEntry.updateResult(failed);
+ return failed;
+ }
+ }
+
private final long index;
private final RaftClientRequest request;
private final TransactionContext entry;
private final CompletableFuture<RaftClientReply> future;
- private volatile RaftClientReply delayed;
+ private volatile DelayedReply delayed;
- PendingRequest(long index, RaftClientRequest request,
- TransactionContext entry) {
+ PendingRequest(long index, RaftClientRequest request, TransactionContext entry) {
this.index = index;
this.request = request;
this.entry = entry;
@@ -74,20 +95,20 @@ public class PendingRequest implements Comparable<PendingRequest> {
future.complete(r);
}
- synchronized void setDelayedReply(RaftClientReply r) {
+ synchronized void setDelayedReply(RaftClientReply r, CacheEntry c) {
Objects.requireNonNull(r);
Preconditions.assertTrue(delayed == null);
- delayed = r;
+ delayed = new DelayedReply(r, c);
}
synchronized void completeDelayedReply() {
- setReply(delayed);
+ setReply(delayed.getReply());
}
synchronized void failDelayedReply() {
- final RaftClientRequest.Type type = request.getType();
- final ReplicationLevel replication = type.getWrite().getReplication();
- setReply(new RaftClientReply(delayed, new NotReplicatedException(request.getCallId(), replication, index)));
+ final ReplicationLevel replication = request.getType().getWrite().getReplication();
+ final NotReplicatedException e = new NotReplicatedException(request.getCallId(), replication, index);
+ setReply(delayed.fail(e));
}
TransactionContext setNotLeaderException(NotLeaderException nle) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 2cf1271..92b3e96 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -18,10 +18,10 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.impl.RetryCache.CacheEntry;
import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.statemachine.TransactionContext;
-import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,13 +84,13 @@ class PendingRequests {
this.name = name + "-" + getClass().getSimpleName();
}
- boolean delay(PendingRequest request, RaftClientReply reply) {
+ boolean delay(PendingRequest request, RaftClientReply reply, CacheEntry cacheEntry) {
if (request.getIndex() <= allAckedIndex.get()) {
return false; // delay is not required.
}
LOG.debug("{}: delay request {}", name, request);
- request.setDelayedReply(reply);
+ request.setDelayedReply(reply, cacheEntry);
final boolean offered;
synchronized (q) {
offered = q.offer(request);
@@ -194,14 +194,15 @@ class PendingRequests {
return pendingRequest != null ? pendingRequest.getEntry() : null;
}
- boolean replyPendingRequest(long index, RaftClientReply reply) {
+ /** @return true if the request is replied; otherwise, the reply is delayed, return false. */
+ boolean replyPendingRequest(long index, RaftClientReply reply, CacheEntry cacheEntry) {
final PendingRequest pending = pendingRequests.remove(index);
if (pending != null) {
Preconditions.assertTrue(pending.getIndex() == index);
final ReplicationLevel replication = pending.getRequest().getType().getWrite().getReplication();
if (replication == ReplicationLevel.ALL) {
- if (delayedReplies.delay(pending, reply)) {
+ if (delayedReplies.delay(pending, reply, cacheEntry)) {
return false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 3879f4a..f2114b9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1064,14 +1064,19 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
final StateMachineException e = new StateMachineException(getId(), exception);
r = new RaftClientReply(clientId, serverId, groupId, callId, false, null, e, getCommitInfos());
}
- // update retry cache
- cacheEntry.updateResult(r);
+
// update pending request
+ boolean updateCache = true; // always update cache for follower
synchronized (RaftServerImpl.this) {
if (isLeader() && leaderState != null) { // is leader and is running
- leaderState.replyPendingRequest(logEntry.getIndex(), r);
+ // For leader, update cache unless the reply is delayed.
+ // When a reply is delayed, the cache will be updated in DelayedReply.getReply().
+ updateCache = leaderState.replyPendingRequest(logEntry.getIndex(), r, cacheEntry);
}
}
+ if (updateCache) {
+ cacheEntry.updateResult(r);
+ }
});
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 2634717..3806bb8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -39,7 +39,6 @@ import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.*;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -502,7 +501,12 @@ public abstract class MiniRaftCluster {
}
public RaftClient createClient(RaftPeerId leaderId, RaftGroup group) {
+ return createClient(leaderId, group, null);
+ }
+
+ public RaftClient createClient(RaftPeerId leaderId, RaftGroup group, ClientId clientId) {
return RaftClient.newBuilder()
+ .setClientId(clientId)
.setRaftGroup(group)
.setLeaderId(leaderId)
.setProperties(properties)
@@ -518,8 +522,13 @@ public abstract class MiniRaftCluster {
public RaftClientRequest newRaftClientRequest(
ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, Message message) {
+ return newRaftClientRequest(clientId, leaderId, callId, seqNum, message, ReplicationLevel.MAJORITY);
+ }
+
+ public RaftClientRequest newRaftClientRequest(
+ ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, Message message, ReplicationLevel replication) {
return new RaftClientRequest(clientId, leaderId, getGroupId(),
- callId, seqNum, message, RaftClientRequest.writeRequestType(ReplicationLevel.MAJORITY));
+ callId, seqNum, message, RaftClientRequest.writeRequestType(replication));
}
public SetConfigurationRequest newSetConfigurationRequest(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86db875a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index 2d352b4..9fdb4f7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -22,6 +22,7 @@ import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
@@ -80,18 +81,25 @@ public abstract class RetryCacheTests extends BaseTest {
final long seqNum = 111;
RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
callId, seqNum, new SimpleMessage("message"));
- RaftClientReply reply = rpc.sendRequest(r);
- Assert.assertEquals(callId, reply.getCallId());
- Assert.assertTrue(reply.isSuccess());
+ assertReply(rpc.sendRequest(r), client, callId);
// retry with the same callId
for (int i = 0; i < 5; i++) {
- reply = rpc.sendRequest(r);
- Assert.assertEquals(client.getId(), reply.getClientId());
- Assert.assertEquals(callId, reply.getCallId());
- Assert.assertTrue(reply.isSuccess());
+ assertReply(rpc.sendRequest(r), client, callId);
}
+ assertServer(cluster, client.getId(), callId, oldLastApplied);
+ client.close();
+ }
+
+ public static RaftClient assertReply(RaftClientReply reply, RaftClient client, long callId) {
+ Assert.assertEquals(client.getId(), reply.getClientId());
+ Assert.assertEquals(callId, reply.getCallId());
+ Assert.assertTrue(reply.isSuccess());
+ return client;
+ }
+
+ public void assertServer(MiniRaftCluster cluster, ClientId clientId, long callId, long oldLastApplied) throws Exception {
long leaderApplied = cluster.getLeader().getState().getLastAppliedIndex();
// make sure retry cache has the entry
for (RaftServerImpl server : cluster.iterateServerImpls()) {
@@ -100,13 +108,10 @@ public abstract class RetryCacheTests extends BaseTest {
Thread.sleep(1000);
}
Assert.assertEquals(2, RaftServerTestUtil.getRetryCacheSize(server));
- Assert.assertNotNull(
- RaftServerTestUtil.getRetryEntry(server, client.getId(), callId));
+ Assert.assertNotNull(RaftServerTestUtil.getRetryEntry(server, clientId, callId));
// make sure there is only one log entry committed
- Assert.assertEquals(oldLastApplied + 1,
- server.getState().getLastAppliedIndex());
+ Assert.assertEquals(oldLastApplied + 1, server.getState().getLastAppliedIndex());
}
- client.close();
}
/**
@@ -125,9 +130,7 @@ public abstract class RetryCacheTests extends BaseTest {
final long seqNum = 111;
RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
callId, seqNum, new SimpleMessage("message"));
- RaftClientReply reply = rpc.sendRequest(r);
- Assert.assertEquals(callId, reply.getCallId());
- Assert.assertTrue(reply.isSuccess());
+ assertReply(rpc.sendRequest(r), client, callId);
long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
// trigger the reconfiguration, make sure the original leader is kicked out
@@ -146,11 +149,8 @@ public abstract class RetryCacheTests extends BaseTest {
rpc.addServers(Arrays.asList(change.newPeers));
for (int i = 0; i < 10; i++) {
try {
- reply = rpc.sendRequest(r);
+ assertReply(rpc.sendRequest(r), client, callId);
LOG.info("successfully sent out the retry request_" + i);
- Assert.assertEquals(client.getId(), reply.getClientId());
- Assert.assertEquals(callId, reply.getCallId());
- Assert.assertTrue(reply.isSuccess());
} catch (Exception e) {
LOG.info("hit exception while retrying the same request: " + r, e);
}