You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/03/31 07:17:47 UTC
incubator-ratis git commit: RATIS-17. Add basic retry cache
implementation for Raft Server. Contributed by Jing Zhao.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 771e8adc4 -> 42fff2b26
RATIS-17. Add basic retry cache implementation for Raft Server. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/42fff2b2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/42fff2b2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/42fff2b2
Branch: refs/heads/master
Commit: 42fff2b266a7841e075cc2202fe4530a4c4cab61
Parents: 771e8ad
Author: Jing Zhao <ji...@apache.org>
Authored: Fri Mar 31 00:17:37 2017 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Fri Mar 31 00:17:37 2017 -0700
----------------------------------------------------------------------
.../TestRaftStateMachineException.java | 116 +++++++++-
.../ratis/grpc/TestRetryCacheWithGrpc.java | 46 ++++
.../hadooprpc/TestRetryCacheWithHadoopRpc.java | 46 ++++
.../ratis/netty/TestRetryCacheWithNettyRpc.java | 45 ++++
.../ratis/server/RaftServerConfigKeys.java | 21 ++
.../apache/ratis/server/impl/LeaderState.java | 6 +-
.../ratis/server/impl/PendingRequest.java | 3 +
.../ratis/server/impl/PendingRequests.java | 16 +-
.../ratis/server/impl/RaftServerImpl.java | 132 ++++++++---
.../apache/ratis/server/impl/RetryCache.java | 225 +++++++++++++++++++
.../ratis/server/impl/StateMachineUpdater.java | 2 +-
.../org/apache/ratis/RaftRetryCacheTests.java | 174 ++++++++++++++
.../ratis/server/impl/RaftServerTestUtil.java | 14 ++
.../TestRetryCacheWithSimulatedRpc.java | 45 ++++
14 files changed, 842 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
index 447f2ea..d8a32f6 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
@@ -21,11 +21,16 @@ import org.apache.log4j.Level;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.examples.RaftExamplesTestUtil;
import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.impl.RetryCache;
import org.apache.ratis.server.simulation.RequestHandler;
import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.util.RaftUtils;
@@ -33,6 +38,8 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
@@ -42,6 +49,8 @@ import static org.junit.Assert.fail;
@RunWith(Parameterized.class)
public class TestRaftStateMachineException {
+ public static final Logger LOG = LoggerFactory.getLogger(TestRaftStateMachineException.class);
+
static {
RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
@@ -49,6 +58,8 @@ public class TestRaftStateMachineException {
RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
}
+ protected static boolean failPreAppend = false;
+
protected static class StateMachineWithException extends SimpleStateMachine4Testing {
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
@@ -56,6 +67,16 @@ public class TestRaftStateMachineException {
future.completeExceptionally(new StateMachineException("Fake Exception"));
return future;
}
+
+ @Override
+ public TransactionContext preAppendTransaction(TransactionContext trx)
+ throws IOException {
+ if (failPreAppend) {
+ throw new IOException("Fake Exception");
+ } else {
+ return trx;
+ }
+ }
}
@Parameterized.Parameters
@@ -69,7 +90,7 @@ public class TestRaftStateMachineException {
@Test
public void testHandleStateMachineException() throws Exception {
- cluster.start();
+ cluster.restart(true);
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();
@@ -84,4 +105,97 @@ public class TestRaftStateMachineException {
cluster.shutdown();
}
+
+ @Test
+ public void testRetryOnStateMachineException() throws Exception {
+ cluster.restart(true);
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+
+ RaftClient client = cluster.createClient(leaderId);
+ try {
+ client.send(new RaftTestUtil.SimpleMessage("first msg to make leader ready"));
+ } catch (Exception ignored) {
+ }
+ long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
+
+ final RaftClientRpc rpc = client.getClientRpc();
+ final long callId = 999;
+ RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
+ callId, new RaftTestUtil.SimpleMessage("message"));
+ RaftClientReply reply = rpc.sendRequest(r);
+ Assert.assertFalse(reply.isSuccess());
+ Assert.assertNotNull(reply.getStateMachineException());
+
+ // retry with the same callId
+ for (int i = 0; i < 5; i++) {
+ reply = rpc.sendRequest(r);
+ Assert.assertEquals(client.getId(), reply.getClientId());
+ Assert.assertEquals(callId, reply.getCallId());
+ Assert.assertFalse(reply.isSuccess());
+ Assert.assertNotNull(reply.getStateMachineException());
+ }
+
+ long leaderApplied = cluster.getLeader().getState().getLastAppliedIndex();
+ // make sure retry cache has the entry
+ for (RaftServerImpl server : cluster.getServers()) {
+ LOG.info("check server " + server.getId());
+ if (server.getState().getLastAppliedIndex() < leaderApplied) {
+ Thread.sleep(1000);
+ }
+ Assert.assertNotNull(
+ RaftServerTestUtil.getRetryEntry(server, client.getId(), callId));
+ Assert.assertEquals(oldLastApplied + 1,
+ server.getState().getLastAppliedIndex());
+ }
+
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testRetryOnExceptionDuringReplication() throws Exception {
+ cluster.restart(true);
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+
+ RaftClient client = cluster.createClient(leaderId);
+ try {
+ client.send(new RaftTestUtil.SimpleMessage("first msg to make leader ready"));
+ } catch (Exception ignored) {
+ }
+
+ // turn on the preAppend failure switch
+ failPreAppend = true;
+ final RaftClientRpc rpc = client.getClientRpc();
+ final long callId = 999;
+ RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
+ callId, new RaftTestUtil.SimpleMessage("message"));
+ try {
+ rpc.sendRequest(r);
+ Assert.fail("Exception expected");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ RetryCache.CacheEntry oldEntry = RaftServerTestUtil.getRetryEntry(
+ cluster.getLeader(), client.getId(), callId);
+ Assert.assertNotNull(oldEntry);
+ Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(oldEntry));
+
+ // retry
+ try {
+ rpc.sendRequest(r);
+ Assert.fail("Exception expected");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ RetryCache.CacheEntry currentEntry = RaftServerTestUtil.getRetryEntry(
+ cluster.getLeader(), client.getId(), callId);
+ Assert.assertNotNull(currentEntry);
+ Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(currentEntry));
+ Assert.assertNotEquals(oldEntry, currentEntry);
+
+ failPreAppend = false;
+ cluster.shutdown();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
new file mode 100644
index 0000000..d709c1c
--- /dev/null
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftRetryCacheTests;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.Assert;
+
+import java.io.IOException;
+
+public class TestRetryCacheWithGrpc extends RaftRetryCacheTests {
+ static {
+ RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+ }
+
+ private final MiniRaftClusterWithGRpc cluster;
+
+ public TestRetryCacheWithGrpc() throws IOException {
+ cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(
+ NUM_SERVERS, properties);
+ Assert.assertNull(cluster.getLeader());
+ }
+
+ @Override
+ public MiniRaftClusterWithGRpc getCluster() {
+ return cluster;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRetryCacheWithHadoopRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRetryCacheWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRetryCacheWithHadoopRpc.java
new file mode 100644
index 0000000..fade34b
--- /dev/null
+++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRetryCacheWithHadoopRpc.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftRetryCacheTests;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.RaftUtils;
+
+import java.io.IOException;
+
+public class TestRetryCacheWithHadoopRpc extends RaftRetryCacheTests {
+ static {
+ RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+ RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+ RaftUtils.setLogLevel(MiniRaftClusterWithHadoopRpc.LOG, Level.DEBUG);
+ }
+
+ private final MiniRaftClusterWithHadoopRpc cluster;
+
+ public TestRetryCacheWithHadoopRpc() throws IOException {
+ cluster = MiniRaftClusterWithHadoopRpc.FACTORY.newCluster(
+ NUM_SERVERS, getProperties());
+ }
+
+ @Override
+ public MiniRaftClusterWithHadoopRpc getCluster() {
+ return cluster;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-netty/src/test/java/org/apache/ratis/netty/TestRetryCacheWithNettyRpc.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestRetryCacheWithNettyRpc.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRetryCacheWithNettyRpc.java
new file mode 100644
index 0000000..540cc16
--- /dev/null
+++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRetryCacheWithNettyRpc.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftRetryCacheTests;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.RaftUtils;
+
+import java.io.IOException;
+
+public class TestRetryCacheWithNettyRpc extends RaftRetryCacheTests {
+ static {
+ RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+ RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+ }
+
+ private final MiniRaftClusterWithNetty cluster;
+
+ public TestRetryCacheWithNettyRpc() throws IOException {
+ cluster = MiniRaftClusterWithNetty.FACTORY.newCluster(
+ NUM_SERVERS, getProperties());
+ }
+
+ @Override
+ public MiniRaftClusterWithNetty getCluster() {
+ return cluster;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 66e06e0..be0c6bc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -17,11 +17,13 @@
*/
package org.apache.ratis.server;
+import org.apache.ratis.conf.ConfUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
import static org.apache.ratis.conf.ConfUtils.*;
@@ -185,6 +187,25 @@ public interface RaftServerConfigKeys {
}
}
+ /** server retry cache related */
+ interface RetryCache {
+ String PREFIX = RaftServerConfigKeys.PREFIX + ".retrycache";
+
+ String CAPACITY_KEY = PREFIX + ".capacity";
+ int CAPACITY_DEFAULT = 4096;
+ static int capacity(RaftProperties properties) {
+ return ConfUtils.getInt(properties::getInt, CAPACITY_KEY, CAPACITY_DEFAULT,
+ ConfUtils.requireMin(0));
+ }
+
+ String EXPIRYTIME_KEY = PREFIX + ".expirytime";
+ TimeDuration EXPIRYTIME_DEFAULT = TimeDuration.valueOf(60, TimeUnit.SECONDS);
+ static TimeDuration expiryTime(RaftProperties properties) {
+ return getTimeDuration(properties.getTimeDuration(EXPIRYTIME_DEFAULT.getUnit()),
+ EXPIRYTIME_KEY, EXPIRYTIME_DEFAULT);
+ }
+ }
+
static void main(String[] args) {
printAll(RaftServerConfigKeys.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index f6c9ade..e7704fc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -31,7 +31,6 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -516,9 +515,8 @@ public class LeaderState {
return pending;
}
- void replyPendingRequest(long logIndex,
- CompletableFuture<Message> stateMachineFuture) {
- pendingRequests.replyPendingRequest(logIndex, stateMachineFuture);
+ void replyPendingRequest(long logIndex, RaftClientReply reply) {
+ pendingRequests.replyPendingRequest(logIndex, reply);
}
TransactionContext getTransactionContext(long index) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
index f5581b9..1d51e54 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -60,6 +60,9 @@ public class PendingRequest implements Comparable<PendingRequest> {
return entry;
}
+ /**
+ * This is only used when setting new raft configuration.
+ */
synchronized void setException(Throwable e) {
RaftUtils.assertTrue(e != null);
future.completeExceptionally(e);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index d4b74f2..c56d7a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -88,23 +88,11 @@ class PendingRequests {
return pendingRequest != null ? pendingRequest.getEntry() : null;
}
- void replyPendingRequest(long index,
- CompletableFuture<Message> stateMachineFuture) {
+ void replyPendingRequest(long index, RaftClientReply reply) {
final PendingRequest pending = pendingRequests.get(index);
if (pending != null) {
RaftUtils.assertTrue(pending.getIndex() == index);
-
- stateMachineFuture.whenComplete((reply, exception) -> {
- if (exception == null) {
- pending.setSuccessReply(reply);
- } else {
- // the exception is coming from the state machine. wrap it into the
- // reply as a StateMachineException
- final StateMachineException e = new StateMachineException(
- server.getId().toString(), exception);
- pending.setReply(new RaftClientReply(pending.getRequest(), e));
- }
- });
+ pending.setReply(reply);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 082c1bf..ca1b6d3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -27,6 +27,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
@@ -35,6 +36,7 @@ import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.RaftUtils;
+import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,6 +90,8 @@ public class RaftServerImpl implements RaftServer {
private final ServerFactory factory;
+ private final RetryCache retryCache;
+
RaftServerImpl(RaftPeerId id, StateMachine stateMachine,
RaftConfiguration raftConf, RaftProperties properties, Parameters parameters)
throws IOException {
@@ -103,6 +107,13 @@ public class RaftServerImpl implements RaftServer {
final RpcType rpcType = RaftConfigKeys.Rpc.type(properties);
this.factory = ServerFactory.cast(rpcType.newFactory(properties, parameters));
this.serverRpc = initRaftServerRpc();
+ retryCache = initRetryCache(properties);
+ }
+
+ private RetryCache initRetryCache(RaftProperties prop) {
+ final int capacity = RaftServerConfigKeys.RetryCache.capacity(prop);
+ final TimeDuration expireTime = RaftServerConfigKeys.RetryCache.expiryTime(prop);
+ return new RetryCache(capacity, expireTime);
}
@Override
@@ -133,6 +144,11 @@ public class RaftServerImpl implements RaftServer {
return this.stateMachine;
}
+ @VisibleForTesting
+ RetryCache getRetryCache() {
+ return retryCache;
+ }
+
private RaftServerRpc initRaftServerRpc() {
final RaftServerRpc rpc = getFactory().newRaftServerRpc(this);
// add peers into rpc service
@@ -322,17 +338,26 @@ public class RaftServerImpl implements RaftServer {
* @return null if the server is in leader state.
*/
private CompletableFuture<RaftClientReply> checkLeaderState(
- RaftClientRequest request) {
+ RaftClientRequest request, RetryCache.CacheEntry entry) {
if (!isLeader()) {
NotLeaderException exception = generateNotLeaderException();
- CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
- future.complete(new RaftClientReply(request, exception));
- return future;
+ final RaftClientReply reply = new RaftClientReply(request, exception);
+ if (entry != null) {
+ entry.failWithReply(reply);
+ }
+ return entry != null ?
+ entry.getReplyFuture() : CompletableFuture.completedFuture(reply);
} else {
if (leaderState == null || !leaderState.isReady()) {
- CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
- future.completeExceptionally(new LeaderNotReadyException());
- return future;
+ final Exception e = new LeaderNotReadyException();
+ if (entry != null) {
+ entry.failWithException(e);
+ return entry.getReplyFuture();
+ } else {
+ CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
}
}
return null;
@@ -360,15 +385,15 @@ public class RaftServerImpl implements RaftServer {
* Handle a normal update request from client.
*/
private CompletableFuture<RaftClientReply> appendTransaction(
- RaftClientRequest request, TransactionContext entry)
- throws RaftException {
+ RaftClientRequest request, TransactionContext context,
+ RetryCache.CacheEntry retryEntry) throws RaftException {
LOG.debug("{}: receive client request({})", getId(), request);
lifeCycle.assertCurrentState(RUNNING);
CompletableFuture<RaftClientReply> reply;
final PendingRequest pending;
synchronized (this) {
- reply = checkLeaderState(request);
+ reply = checkLeaderState(request, retryEntry);
if (reply != null) {
return reply;
}
@@ -376,14 +401,19 @@ public class RaftServerImpl implements RaftServer {
// append the message to its local log
final long entryIndex;
try {
- entryIndex = state.applyLog(entry, request.getClientId(),
+ entryIndex = state.applyLog(context, request.getClientId(),
request.getCallId());
} catch (IOException e) {
- throw new RaftException(e);
+ // TODO looks like the IOException is actually only thrown by the SM in
+ // the preAppend stage. In that case we should wrap the exception in
+ // StateMachineException and return the exception in a RaftClientReply.
+ RaftException re = new RaftException(e);
+ retryEntry.failWithException(re);
+ throw re;
}
// put the request into the pending queue
- pending = leaderState.addPendingRequest(entryIndex, request, entry);
+ pending = leaderState.addPendingRequest(entryIndex, request, context);
leaderState.notifySenders();
}
return pending.getFuture();
@@ -393,27 +423,41 @@ public class RaftServerImpl implements RaftServer {
public CompletableFuture<RaftClientReply> submitClientRequestAsync(
RaftClientRequest request) throws IOException {
// first check the server's leader state
- CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
+ CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null);
if (reply != null) {
return reply;
}
// let the state machine handle read-only request from client
if (request.isReadOnly()) {
- // TODO: We might not be the leader anymore by the time this completes. See the RAFT paper,
- // section 8 (last part)
+ // TODO: We might not be the leader anymore by the time this completes.
+ // See the RAFT paper section 8 (last part)
return stateMachine.query(request);
}
- // TODO: this client request will not be added to pending requests
- // until later which means that any failure in between will leave partial state in the
- // state machine. We should call cancelTransaction() for failed requests
- TransactionContext entry = stateMachine.startTransaction(request);
- if (entry.getException().isPresent()) {
- throw RaftUtils.asIOException(entry.getException().get());
+ // query the retry cache
+ RetryCache.CacheQueryResult previousResult = retryCache.queryCache(
+ request.getClientId(), request.getCallId());
+ if (previousResult.isRetry()) {
+ // if the previous attempt is still pending or it succeeded, return its
+ // future
+ return previousResult.getEntry().getReplyFuture();
+ }
+ final RetryCache.CacheEntry cacheEntry = previousResult.getEntry();
+
+ // TODO: this client request will not be added to pending requests until
+ // later which means that any failure in between will leave partial state in
+ // the state machine. We should call cancelTransaction() for failed requests
+ TransactionContext context = stateMachine.startTransaction(request);
+ if (context.getException().isPresent()) {
+ RaftClientReply exceptionReply = new RaftClientReply(request,
+ new StateMachineException(getId().toString(),
+ context.getException().get()));
+ cacheEntry.failWithReply(exceptionReply);
+ return CompletableFuture.completedFuture(exceptionReply);
}
- return appendTransaction(request, entry);
+ return appendTransaction(request, context, cacheEntry);
}
@Override
@@ -459,7 +503,7 @@ public class RaftServerImpl implements RaftServer {
SetConfigurationRequest request) throws IOException {
LOG.debug("{}: receive setConfiguration({})", getId(), request);
lifeCycle.assertCurrentState(RUNNING);
- CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
+ CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null);
if (reply != null) {
return reply;
}
@@ -467,7 +511,7 @@ public class RaftServerImpl implements RaftServer {
final RaftPeer[] peersInNewConf = request.getPeersInNewConf();
final PendingRequest pending;
synchronized (this) {
- reply = checkLeaderState(request);
+ reply = checkLeaderState(request, null);
if (reply != null) {
return reply;
}
@@ -799,11 +843,41 @@ public class RaftServerImpl implements RaftServer {
}
}
- synchronized void replyPendingRequest(long logIndex,
+ /**
+ * The log has been submitted to the state machine. Use the future to update
+ * the pending requests and retry cache.
+ * @param logEntry the log entry that has been submitted to the state machine
+ * @param stateMachineFuture the future returned by the state machine
+ * from which we will get transaction result later
+ */
+ void replyPendingRequest(LogEntryProto logEntry,
CompletableFuture<Message> stateMachineFuture) {
- if (isLeader() && leaderState != null) { // is leader and is running
- leaderState.replyPendingRequest(logIndex, stateMachineFuture);
- }
+ // update the retry cache
+ final ClientId clientId = new ClientId(logEntry.getClientId().toByteArray());
+ final long callId = logEntry.getCallId();
+ final RaftPeerId serverId = getId();
+ final RetryCache.CacheEntry cacheEntry = retryCache.getOrCreateEntry(
+ clientId, logEntry.getCallId());
+ stateMachineFuture.whenComplete((reply, exception) -> {
+ final RaftClientReply r;
+ if (exception == null) {
+ r = new RaftClientReply(clientId, serverId, callId, true, reply, null);
+ } else {
+ // the exception is coming from the state machine. wrap it into the
+ // reply as a StateMachineException
+ final StateMachineException e = new StateMachineException(
+ getId().toString(), exception);
+ r = new RaftClientReply(clientId, serverId, callId, false, null, e);
+ }
+ // update retry cache
+ cacheEntry.updateResult(r);
+ // update pending request
+ synchronized (RaftServerImpl.this) {
+ if (isLeader() && leaderState != null) { // is leader and is running
+ leaderState.replyPendingRequest(logEntry.getIndex(), r);
+ }
+ }
+ });
}
TransactionContext getTransactionContext(long index) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
new file mode 100644
index 0000000..0372558
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.shaded.com.google.common.cache.Cache;
+import org.apache.ratis.shaded.com.google.common.cache.CacheBuilder;
+import org.apache.ratis.util.RaftUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class RetryCache implements Closeable {
+ static final Logger LOG = LoggerFactory.getLogger(RetryCache.class);
+ private static final int MIN_CAPACITY = 128;
+
+ static class CacheKey {
+ private final ClientId clientId;
+ private final long callId;
+
+ CacheKey(ClientId clientId, long callId) {
+ this.clientId = clientId;
+ this.callId = callId;
+ }
+
+ @Override
+ public int hashCode() {
+ return clientId.hashCode() ^ Long.hashCode(callId);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof CacheKey) {
+ CacheKey e = (CacheKey) obj;
+ return e.clientId.equals(clientId) && callId == e.callId;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return clientId.toString() + ":" + this.callId;
+ }
+ }
+
+ /**
+ * CacheEntry is tracked using unique client ID and callId of the RPC request
+ */
+ @VisibleForTesting
+ public static class CacheEntry {
+ private final CacheKey key;
+ private final CompletableFuture<RaftClientReply> replyFuture =
+ new CompletableFuture<>();
+
+ /**
+ * "failed" means we failed to commit the request into the raft group, or
+ * the request did not get approved by the state machine before the raft
+ * replication. Not once the request gets committed by the raft group, this
+ * field is never true even if the state machine throws an exception when
+ * applying the transaction.
+ */
+ private volatile boolean failed = false;
+
+ CacheEntry(CacheKey key) {
+ this.key = key;
+ }
+
+ @Override
+ public String toString() {
+ return key + ":" + (isDone() ? "done" : "pending");
+ }
+
+ boolean isDone() {
+ return isFailed() || replyFuture.isDone();
+ }
+
+ void updateResult(RaftClientReply reply) {
+ assert !replyFuture.isDone() && !replyFuture.isCancelled();
+ replyFuture.complete(reply);
+ }
+
+ boolean isFailed() {
+ return failed || replyFuture.isCompletedExceptionally();
+ }
+
+ void failWithReply(RaftClientReply reply) {
+ failed = true;
+ replyFuture.complete(reply);
+ }
+
+ void failWithException(Throwable t) {
+ failed = true;
+ replyFuture.completeExceptionally(t);
+ }
+
+ CompletableFuture<RaftClientReply> getReplyFuture() {
+ return replyFuture;
+ }
+ }
+
+ static class CacheQueryResult {
+ private final CacheEntry entry;
+ private final boolean isRetry;
+
+ CacheQueryResult(CacheEntry entry, boolean isRetry) {
+ this.entry = entry;
+ this.isRetry = isRetry;
+ }
+
+ public CacheEntry getEntry() {
+ return entry;
+ }
+
+ public boolean isRetry() {
+ return isRetry;
+ }
+ }
+
+ private final Cache<CacheKey, CacheEntry> cache;
+
+ /**
+ * @param capacity the capacity of the cache
+ * @param expirationTime time for an entry to expire in milliseconds
+ */
+ RetryCache(int capacity, TimeDuration expirationTime) {
+ capacity = Math.max(capacity, MIN_CAPACITY);
+ cache = CacheBuilder.newBuilder().maximumSize(capacity)
+ .expireAfterWrite(expirationTime.toLong(TimeUnit.MILLISECONDS),
+ TimeUnit.MILLISECONDS).build();
+ }
+
+ CacheEntry getOrCreateEntry(ClientId clientId, long callId) {
+ final CacheKey key = new CacheKey(clientId, callId);
+ final CacheEntry entry;
+ try {
+ entry = cache.get(key, () -> new CacheEntry(key));
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(e);
+ }
+ RaftUtils.assertTrue(entry != null && !entry.isDone(),
+ "retry cache entry should be pending: %s", entry);
+ return entry;
+ }
+
+ private CacheEntry refreshEntry(CacheEntry newEntry) {
+ cache.put(newEntry.key, newEntry);
+ return newEntry;
+ }
+
+ CacheQueryResult queryCache(ClientId clientId, long callId) {
+ CacheKey key = new CacheKey(clientId, callId);
+ final CacheEntry newEntry = new CacheEntry(key);
+ CacheEntry cacheEntry;
+ try {
+ cacheEntry = cache.get(key, () -> newEntry);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(e);
+ }
+
+ if (cacheEntry == newEntry) {
+ // this is the entry we just newly created
+ return new CacheQueryResult(cacheEntry, false);
+ } else if (!cacheEntry.isDone() || !cacheEntry.isFailed()){
+ // the previous attempt is either pending or successful
+ return new CacheQueryResult(cacheEntry, true);
+ }
+
+ // the previous attempt failed, replace it with a new one.
+ synchronized (this) {
+ // need to recheck, since there may be other retry attempts being
+ // processed at the same time. The recheck+replacement should be protected
+ // by lock.
+ CacheEntry currentEntry = cache.getIfPresent(key);
+ if (currentEntry == cacheEntry || currentEntry == null) {
+ // if the failed entry has not got replaced by another retry, or the
+ // failed entry got invalidated, we add a new cache entry
+ return new CacheQueryResult(refreshEntry(newEntry), false);
+ } else {
+ return new CacheQueryResult(currentEntry, true);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ long size() {
+ return cache.size();
+ }
+
+ @VisibleForTesting
+ CacheEntry get(ClientId clientId, long callId) {
+ return cache.getIfPresent(new CacheKey(clientId, callId));
+ }
+
+ @Override
+ public synchronized void close() {
+ if (cache != null) {
+ cache.invalidateAll();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index b4fc705..38436a2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -164,7 +164,7 @@ class StateMachineUpdater implements Runnable {
// TODO: This step can be parallelized
CompletableFuture<Message> stateMachineFuture =
stateMachine.applyTransaction(trx);
- server.replyPendingRequest(next.getIndex(), stateMachineFuture);
+ server.replyPendingRequest(next, stateMachineFuture);
}
lastAppliedIndex++;
} else {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java
new file mode 100644
index 0000000..66c54cb
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.ratis.MiniRaftCluster.PeerChanges;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static java.util.Arrays.asList;
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
+
+public abstract class RaftRetryCacheTests {
+ public static final Logger LOG = LoggerFactory.getLogger(RaftRetryCacheTests.class);
+
+ public static final int NUM_SERVERS = 3;
+ protected static final RaftProperties properties = new RaftProperties();
+
+ public abstract MiniRaftCluster getCluster();
+
+ public RaftProperties getProperties() {
+ return properties;
+ }
+
+ @Rule
+ public Timeout globalTimeout = new Timeout(120 * 1000);
+
+ @Before
+ public void setup() throws IOException {
+ Assert.assertNull(getCluster().getLeader());
+ getCluster().start();
+ }
+
+ @After
+ public void tearDown() {
+ final MiniRaftCluster cluster = getCluster();
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * make sure the retry cache can correct capture the retry from a client,
+ * and returns the result from the previous request
+ */
+ @Test
+ public void testBasicRetry() throws Exception {
+ final MiniRaftCluster cluster = getCluster();
+ RaftTestUtil.waitForLeader(cluster);
+
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+ RaftClient client = cluster.createClient(leaderId);
+ client.send(new RaftTestUtil.SimpleMessage("first msg to make leader ready"));
+ long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
+
+ final RaftClientRpc rpc = client.getClientRpc();
+ final long callId = 999;
+ RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
+ callId, new RaftTestUtil.SimpleMessage("message"));
+ RaftClientReply reply = rpc.sendRequest(r);
+ Assert.assertEquals(callId, reply.getCallId());
+ Assert.assertTrue(reply.isSuccess());
+
+ // retry with the same callId
+ for (int i = 0; i < 5; i++) {
+ reply = rpc.sendRequest(r);
+ Assert.assertEquals(client.getId(), reply.getClientId());
+ Assert.assertEquals(callId, reply.getCallId());
+ Assert.assertTrue(reply.isSuccess());
+ }
+
+ long leaderApplied = cluster.getLeader().getState().getLastAppliedIndex();
+ // make sure retry cache has the entry
+ for (RaftServerImpl server : cluster.getServers()) {
+ LOG.info("check server " + server.getId());
+ if (server.getState().getLastAppliedIndex() < leaderApplied) {
+ Thread.sleep(1000);
+ }
+ Assert.assertEquals(2, RaftServerTestUtil.getRetryCacheSize(server));
+ Assert.assertNotNull(
+ RaftServerTestUtil.getRetryEntry(server, client.getId(), callId));
+ // make sure there is only one log entry committed
+ Assert.assertEquals(oldLastApplied + 1,
+ server.getState().getLastAppliedIndex());
+ }
+ }
+
+ /**
+ * Test retry while the leader changes to another peer
+ */
+ @Test
+ public void testRetryOnNewLeader() throws Exception {
+ final MiniRaftCluster cluster = getCluster();
+ RaftTestUtil.waitForLeader(cluster);
+
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+ RaftClient client = cluster.createClient(leaderId);
+ client.send(new RaftTestUtil.SimpleMessage("first msg to make leader ready"));
+
+ RaftClientRpc rpc = client.getClientRpc();
+ final long callId = 999;
+ RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
+ callId, new RaftTestUtil.SimpleMessage("message"));
+ RaftClientReply reply = rpc.sendRequest(r);
+ Assert.assertEquals(callId, reply.getCallId());
+ Assert.assertTrue(reply.isSuccess());
+ long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
+
+ // trigger the reconfiguration, make sure the original leader is kicked out
+ PeerChanges change = cluster.addNewPeers(2, true);
+ RaftPeer[] allPeers = cluster.removePeers(2, true,
+ asList(change.newPeers)).allPeersInNewConf;
+ // trigger setConfiguration
+ SetConfigurationRequest request = new SetConfigurationRequest(
+ client.getId(), cluster.getLeader().getId(), DEFAULT_CALLID, allPeers);
+ LOG.info("Start changing the configuration: {}", request);
+ cluster.getLeader().setConfiguration(request);
+
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId newLeaderId = cluster.getLeader().getId();
+ Assert.assertNotEquals(leaderId, newLeaderId);
+ // same clientId and callId in the request
+ r = new RaftClientRequest(client.getId(), newLeaderId,
+ callId, new RaftTestUtil.SimpleMessage("message"));
+ for (int i = 0; i < 10; i++) {
+ try {
+ reply = rpc.sendRequest(r);
+ LOG.info("successfully sent out the retry request_" + i);
+ Assert.assertEquals(client.getId(), reply.getClientId());
+ Assert.assertEquals(callId, reply.getCallId());
+ Assert.assertTrue(reply.isSuccess());
+ } catch (Exception e) {
+ LOG.info("hit exception while retrying the same request: " + e);
+ }
+ Thread.sleep(100);
+ }
+
+ // check the new leader and make sure the retry did not get committed
+ Assert.assertEquals(oldLastApplied + 3,
+ cluster.getLeader().getState().getLastAppliedIndex());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 012cfd7..0762f21 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -19,6 +19,7 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftPeer;
import org.junit.Assert;
import org.slf4j.Logger;
@@ -69,4 +70,17 @@ public class RaftServerTestUtil {
RaftConfiguration initialConf) {
return new ConfigurationManager(initialConf);
}
+
+ public static long getRetryCacheSize(RaftServerImpl server) {
+ return server.getRetryCache().size();
+ }
+
+ public static RetryCache.CacheEntry getRetryEntry(RaftServerImpl server,
+ ClientId clientId, long callId) {
+ return server.getRetryCache().get(clientId, callId);
+ }
+
+ public static boolean isRetryCacheEntryFailed(RetryCache.CacheEntry entry) {
+ return entry.isFailed();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java
new file mode 100644
index 0000000..5e896e2
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftRetryCacheTests;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.RaftUtils;
+
+import java.io.IOException;
+
+public class TestRetryCacheWithSimulatedRpc extends RaftRetryCacheTests {
+ static {
+ RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+ RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+ }
+
+ private final MiniRaftClusterWithSimulatedRpc cluster;
+
+ public TestRetryCacheWithSimulatedRpc() throws IOException {
+ cluster = MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster(
+ NUM_SERVERS, getProperties());
+ }
+
+ @Override
+ public MiniRaftClusterWithSimulatedRpc getCluster() {
+ return cluster;
+ }
+}