You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/03/31 07:17:47 UTC

incubator-ratis git commit: RATIS-17. Add basic retry cache implementation for Raft Server. Contributed by Jing Zhao.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 771e8adc4 -> 42fff2b26


RATIS-17. Add basic retry cache implementation for Raft Server. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/42fff2b2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/42fff2b2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/42fff2b2

Branch: refs/heads/master
Commit: 42fff2b266a7841e075cc2202fe4530a4c4cab61
Parents: 771e8ad
Author: Jing Zhao <ji...@apache.org>
Authored: Fri Mar 31 00:17:37 2017 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Fri Mar 31 00:17:37 2017 -0700

----------------------------------------------------------------------
 .../TestRaftStateMachineException.java          | 116 +++++++++-
 .../ratis/grpc/TestRetryCacheWithGrpc.java      |  46 ++++
 .../hadooprpc/TestRetryCacheWithHadoopRpc.java  |  46 ++++
 .../ratis/netty/TestRetryCacheWithNettyRpc.java |  45 ++++
 .../ratis/server/RaftServerConfigKeys.java      |  21 ++
 .../apache/ratis/server/impl/LeaderState.java   |   6 +-
 .../ratis/server/impl/PendingRequest.java       |   3 +
 .../ratis/server/impl/PendingRequests.java      |  16 +-
 .../ratis/server/impl/RaftServerImpl.java       | 132 ++++++++---
 .../apache/ratis/server/impl/RetryCache.java    | 225 +++++++++++++++++++
 .../ratis/server/impl/StateMachineUpdater.java  |   2 +-
 .../org/apache/ratis/RaftRetryCacheTests.java   | 174 ++++++++++++++
 .../ratis/server/impl/RaftServerTestUtil.java   |  14 ++
 .../TestRetryCacheWithSimulatedRpc.java         |  45 ++++
 14 files changed, 842 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
index 447f2ea..d8a32f6 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
@@ -21,11 +21,16 @@ import org.apache.log4j.Level;
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.examples.RaftExamplesTestUtil;
 import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.impl.RetryCache;
 import org.apache.ratis.server.simulation.RequestHandler;
 import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.util.RaftUtils;
@@ -33,6 +38,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -42,6 +49,8 @@ import static org.junit.Assert.fail;
 
 @RunWith(Parameterized.class)
 public class TestRaftStateMachineException {
+  public static final Logger LOG = LoggerFactory.getLogger(TestRaftStateMachineException.class);
+
   static {
     RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
@@ -49,6 +58,8 @@ public class TestRaftStateMachineException {
     RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
   }
 
+  protected static boolean failPreAppend = false;
+
   protected static class StateMachineWithException extends SimpleStateMachine4Testing {
     @Override
     public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
@@ -56,6 +67,16 @@ public class TestRaftStateMachineException {
       future.completeExceptionally(new StateMachineException("Fake Exception"));
       return future;
     }
+
+    @Override
+    public TransactionContext preAppendTransaction(TransactionContext trx)
+        throws IOException {
+      if (failPreAppend) {
+        throw new IOException("Fake Exception");
+      } else {
+        return trx;
+      }
+    }
   }
 
   @Parameterized.Parameters
@@ -69,7 +90,7 @@ public class TestRaftStateMachineException {
 
   @Test
   public void testHandleStateMachineException() throws Exception {
-    cluster.start();
+    cluster.restart(true);
     RaftTestUtil.waitForLeader(cluster);
 
     final RaftPeerId leaderId = cluster.getLeader().getId();
@@ -84,4 +105,97 @@ public class TestRaftStateMachineException {
 
     cluster.shutdown();
   }
+
+  @Test
+  public void testRetryOnStateMachineException() throws Exception {
+    cluster.restart(true);
+    RaftTestUtil.waitForLeader(cluster);
+    final RaftPeerId leaderId = cluster.getLeader().getId();
+
+    RaftClient client = cluster.createClient(leaderId);
+    try {
+      client.send(new RaftTestUtil.SimpleMessage("first msg to make leader ready"));
+    } catch (Exception ignored) {
+    }
+    long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
+
+    final RaftClientRpc rpc = client.getClientRpc();
+    final long callId = 999;
+    RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
+        callId, new RaftTestUtil.SimpleMessage("message"));
+    RaftClientReply reply = rpc.sendRequest(r);
+    Assert.assertFalse(reply.isSuccess());
+    Assert.assertNotNull(reply.getStateMachineException());
+
+    // retry with the same callId
+    for (int i = 0; i < 5; i++) {
+      reply = rpc.sendRequest(r);
+      Assert.assertEquals(client.getId(), reply.getClientId());
+      Assert.assertEquals(callId, reply.getCallId());
+      Assert.assertFalse(reply.isSuccess());
+      Assert.assertNotNull(reply.getStateMachineException());
+    }
+
+    long leaderApplied = cluster.getLeader().getState().getLastAppliedIndex();
+    // make sure retry cache has the entry
+    for (RaftServerImpl server : cluster.getServers()) {
+      LOG.info("check server " + server.getId());
+      if (server.getState().getLastAppliedIndex() < leaderApplied) {
+        Thread.sleep(1000);
+      }
+      Assert.assertNotNull(
+          RaftServerTestUtil.getRetryEntry(server, client.getId(), callId));
+      Assert.assertEquals(oldLastApplied + 1,
+          server.getState().getLastAppliedIndex());
+    }
+
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testRetryOnExceptionDuringReplication() throws Exception {
+    cluster.restart(true);
+    RaftTestUtil.waitForLeader(cluster);
+    final RaftPeerId leaderId = cluster.getLeader().getId();
+
+    RaftClient client = cluster.createClient(leaderId);
+    try {
+      client.send(new RaftTestUtil.SimpleMessage("first msg to make leader ready"));
+    } catch (Exception ignored) {
+    }
+
+    // turn on the preAppend failure switch
+    failPreAppend = true;
+    final RaftClientRpc rpc = client.getClientRpc();
+    final long callId = 999;
+    RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
+        callId, new RaftTestUtil.SimpleMessage("message"));
+    try {
+      rpc.sendRequest(r);
+      Assert.fail("Exception expected");
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    RetryCache.CacheEntry oldEntry = RaftServerTestUtil.getRetryEntry(
+        cluster.getLeader(), client.getId(), callId);
+    Assert.assertNotNull(oldEntry);
+    Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(oldEntry));
+
+    // retry
+    try {
+      rpc.sendRequest(r);
+      Assert.fail("Exception expected");
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    RetryCache.CacheEntry currentEntry = RaftServerTestUtil.getRetryEntry(
+        cluster.getLeader(), client.getId(), callId);
+    Assert.assertNotNull(currentEntry);
+    Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(currentEntry));
+    Assert.assertNotEquals(oldEntry, currentEntry);
+
+    failPreAppend = false;
+    cluster.shutdown();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
new file mode 100644
index 0000000..d709c1c
--- /dev/null
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftRetryCacheTests;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.Assert;
+
+import java.io.IOException;
+
+public class TestRetryCacheWithGrpc extends RaftRetryCacheTests {
+  static {
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+  }
+
+  private final MiniRaftClusterWithGRpc cluster;
+
+  public TestRetryCacheWithGrpc() throws IOException {
+    cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(
+        NUM_SERVERS, properties);
+    Assert.assertNull(cluster.getLeader());
+  }
+
+  @Override
+  public MiniRaftClusterWithGRpc getCluster() {
+    return cluster;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRetryCacheWithHadoopRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRetryCacheWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRetryCacheWithHadoopRpc.java
new file mode 100644
index 0000000..fade34b
--- /dev/null
+++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRetryCacheWithHadoopRpc.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftRetryCacheTests;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.RaftUtils;
+
+import java.io.IOException;
+
+public class TestRetryCacheWithHadoopRpc extends RaftRetryCacheTests {
+  static {
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(MiniRaftClusterWithHadoopRpc.LOG, Level.DEBUG);
+  }
+
+  private final MiniRaftClusterWithHadoopRpc cluster;
+
+  public TestRetryCacheWithHadoopRpc() throws IOException {
+    cluster = MiniRaftClusterWithHadoopRpc.FACTORY.newCluster(
+        NUM_SERVERS, getProperties());
+  }
+
+  @Override
+  public MiniRaftClusterWithHadoopRpc getCluster() {
+    return cluster;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-netty/src/test/java/org/apache/ratis/netty/TestRetryCacheWithNettyRpc.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestRetryCacheWithNettyRpc.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRetryCacheWithNettyRpc.java
new file mode 100644
index 0000000..540cc16
--- /dev/null
+++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRetryCacheWithNettyRpc.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftRetryCacheTests;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.RaftUtils;
+
+import java.io.IOException;
+
+public class TestRetryCacheWithNettyRpc extends RaftRetryCacheTests {
+  static {
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  private final MiniRaftClusterWithNetty cluster;
+
+  public TestRetryCacheWithNettyRpc() throws IOException {
+    cluster = MiniRaftClusterWithNetty.FACTORY.newCluster(
+        NUM_SERVERS, getProperties());
+  }
+
+  @Override
+  public MiniRaftClusterWithNetty getCluster() {
+    return cluster;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 66e06e0..be0c6bc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -17,11 +17,13 @@
  */
 package org.apache.ratis.server;
 
+import org.apache.ratis.conf.ConfUtils;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
 
 import static org.apache.ratis.conf.ConfUtils.*;
 
@@ -185,6 +187,25 @@ public interface RaftServerConfigKeys {
     }
   }
 
+  /** server retry cache related */
+  interface RetryCache {
+    String PREFIX = RaftServerConfigKeys.PREFIX + ".retrycache";
+
+    String CAPACITY_KEY = PREFIX + ".capacity";
+    int CAPACITY_DEFAULT = 4096;
+    static int capacity(RaftProperties properties) {
+      return ConfUtils.getInt(properties::getInt, CAPACITY_KEY, CAPACITY_DEFAULT,
+          ConfUtils.requireMin(0));
+    }
+
+    String EXPIRYTIME_KEY = PREFIX + ".expirytime";
+    TimeDuration EXPIRYTIME_DEFAULT = TimeDuration.valueOf(60, TimeUnit.SECONDS);
+    static TimeDuration expiryTime(RaftProperties properties) {
+      return getTimeDuration(properties.getTimeDuration(EXPIRYTIME_DEFAULT.getUnit()),
+          EXPIRYTIME_KEY, EXPIRYTIME_DEFAULT);
+    }
+  }
+
   static void main(String[] args) {
     printAll(RaftServerConfigKeys.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index f6c9ade..e7704fc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -31,7 +31,6 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -516,9 +515,8 @@ public class LeaderState {
     return pending;
   }
 
-  void replyPendingRequest(long logIndex,
-      CompletableFuture<Message> stateMachineFuture) {
-    pendingRequests.replyPendingRequest(logIndex, stateMachineFuture);
+  void replyPendingRequest(long logIndex, RaftClientReply reply) {
+    pendingRequests.replyPendingRequest(logIndex, reply);
   }
 
   TransactionContext getTransactionContext(long index) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
index f5581b9..1d51e54 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -60,6 +60,9 @@ public class PendingRequest implements Comparable<PendingRequest> {
     return entry;
   }
 
+  /**
+   * This is only used when setting new raft configuration.
+   */
   synchronized void setException(Throwable e) {
     RaftUtils.assertTrue(e != null);
     future.completeExceptionally(e);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index d4b74f2..c56d7a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -88,23 +88,11 @@ class PendingRequests {
     return pendingRequest != null ? pendingRequest.getEntry() : null;
   }
 
-  void replyPendingRequest(long index,
-      CompletableFuture<Message> stateMachineFuture) {
+  void replyPendingRequest(long index, RaftClientReply reply) {
     final PendingRequest pending = pendingRequests.get(index);
     if (pending != null) {
       RaftUtils.assertTrue(pending.getIndex() == index);
-
-      stateMachineFuture.whenComplete((reply, exception) -> {
-        if (exception == null) {
-          pending.setSuccessReply(reply);
-        } else {
-          // the exception is coming from the state machine. wrap it into the
-          // reply as a StateMachineException
-          final StateMachineException e = new StateMachineException(
-              server.getId().toString(), exception);
-          pending.setReply(new RaftClientReply(pending.getRequest(), e));
-        }
-      });
+      pending.setReply(reply);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 082c1bf..ca1b6d3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -27,6 +27,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
@@ -35,6 +36,7 @@ import org.apache.ratis.util.CodeInjectionForTesting;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.RaftUtils;
+import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,6 +90,8 @@ public class RaftServerImpl implements RaftServer {
 
   private final ServerFactory factory;
 
+  private final RetryCache retryCache;
+
   RaftServerImpl(RaftPeerId id, StateMachine stateMachine,
       RaftConfiguration raftConf, RaftProperties properties, Parameters parameters)
       throws IOException {
@@ -103,6 +107,13 @@ public class RaftServerImpl implements RaftServer {
     final RpcType rpcType = RaftConfigKeys.Rpc.type(properties);
     this.factory = ServerFactory.cast(rpcType.newFactory(properties, parameters));
     this.serverRpc = initRaftServerRpc();
+    retryCache = initRetryCache(properties);
+  }
+
+  private RetryCache initRetryCache(RaftProperties prop) {
+    final int capacity = RaftServerConfigKeys.RetryCache.capacity(prop);
+    final TimeDuration expireTime = RaftServerConfigKeys.RetryCache.expiryTime(prop);
+    return new RetryCache(capacity, expireTime);
   }
 
   @Override
@@ -133,6 +144,11 @@ public class RaftServerImpl implements RaftServer {
     return this.stateMachine;
   }
 
+  @VisibleForTesting
+  RetryCache getRetryCache() {
+    return retryCache;
+  }
+
   private RaftServerRpc initRaftServerRpc() {
     final RaftServerRpc rpc = getFactory().newRaftServerRpc(this);
     // add peers into rpc service
@@ -322,17 +338,26 @@ public class RaftServerImpl implements RaftServer {
    * @return null if the server is in leader state.
    */
   private CompletableFuture<RaftClientReply> checkLeaderState(
-      RaftClientRequest request) {
+      RaftClientRequest request, RetryCache.CacheEntry entry) {
     if (!isLeader()) {
       NotLeaderException exception = generateNotLeaderException();
-      CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
-      future.complete(new RaftClientReply(request, exception));
-      return future;
+      final RaftClientReply reply = new RaftClientReply(request, exception);
+      if (entry != null) {
+        entry.failWithReply(reply);
+      }
+      return entry != null ?
+          entry.getReplyFuture() : CompletableFuture.completedFuture(reply);
     } else {
       if (leaderState == null || !leaderState.isReady()) {
-        CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
-        future.completeExceptionally(new LeaderNotReadyException());
-        return future;
+        final Exception e = new LeaderNotReadyException();
+        if (entry != null) {
+          entry.failWithException(e);
+          return entry.getReplyFuture();
+        } else {
+          CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
+          future.completeExceptionally(e);
+          return future;
+        }
       }
     }
     return null;
@@ -360,15 +385,15 @@ public class RaftServerImpl implements RaftServer {
    * Handle a normal update request from client.
    */
   private CompletableFuture<RaftClientReply> appendTransaction(
-      RaftClientRequest request, TransactionContext entry)
-      throws RaftException {
+      RaftClientRequest request, TransactionContext context,
+      RetryCache.CacheEntry retryEntry) throws RaftException {
     LOG.debug("{}: receive client request({})", getId(), request);
     lifeCycle.assertCurrentState(RUNNING);
     CompletableFuture<RaftClientReply> reply;
 
     final PendingRequest pending;
     synchronized (this) {
-      reply = checkLeaderState(request);
+      reply = checkLeaderState(request, retryEntry);
       if (reply != null) {
         return reply;
       }
@@ -376,14 +401,19 @@ public class RaftServerImpl implements RaftServer {
       // append the message to its local log
       final long entryIndex;
       try {
-        entryIndex = state.applyLog(entry, request.getClientId(),
+        entryIndex = state.applyLog(context, request.getClientId(),
             request.getCallId());
       } catch (IOException e) {
-        throw new RaftException(e);
+        // TODO looks like the IOException is actually only thrown by the SM in
+        // the preAppend stage. In that case we should wrap the exception in
+        // StateMachineException and return the exception in a RaftClientReply.
+        RaftException re = new RaftException(e);
+        retryEntry.failWithException(re);
+        throw re;
       }
 
       // put the request into the pending queue
-      pending = leaderState.addPendingRequest(entryIndex, request, entry);
+      pending = leaderState.addPendingRequest(entryIndex, request, context);
       leaderState.notifySenders();
     }
     return pending.getFuture();
@@ -393,27 +423,41 @@ public class RaftServerImpl implements RaftServer {
   public CompletableFuture<RaftClientReply> submitClientRequestAsync(
       RaftClientRequest request) throws IOException {
     // first check the server's leader state
-    CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
+    CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null);
     if (reply != null) {
       return reply;
     }
 
     // let the state machine handle read-only request from client
     if (request.isReadOnly()) {
-      // TODO: We might not be the leader anymore by the time this completes. See the RAFT paper,
-      // section 8 (last part)
+      // TODO: We might not be the leader anymore by the time this completes.
+      // See the RAFT paper section 8 (last part)
       return stateMachine.query(request);
     }
 
-    // TODO: this client request will not be added to pending requests
-    // until later which means that any failure in between will leave partial state in the
-    // state machine. We should call cancelTransaction() for failed requests
-    TransactionContext entry = stateMachine.startTransaction(request);
-    if (entry.getException().isPresent()) {
-      throw RaftUtils.asIOException(entry.getException().get());
+    // query the retry cache
+    RetryCache.CacheQueryResult previousResult = retryCache.queryCache(
+        request.getClientId(), request.getCallId());
+    if (previousResult.isRetry()) {
+      // if the previous attempt is still pending or it succeeded, return its
+      // future
+      return previousResult.getEntry().getReplyFuture();
+    }
+    final RetryCache.CacheEntry cacheEntry = previousResult.getEntry();
+
+    // TODO: this client request will not be added to pending requests until
+    // later which means that any failure in between will leave partial state in
+    // the state machine. We should call cancelTransaction() for failed requests
+    TransactionContext context = stateMachine.startTransaction(request);
+    if (context.getException().isPresent()) {
+      RaftClientReply exceptionReply = new RaftClientReply(request,
+          new StateMachineException(getId().toString(),
+              context.getException().get()));
+      cacheEntry.failWithReply(exceptionReply);
+      return CompletableFuture.completedFuture(exceptionReply);
     }
 
-    return appendTransaction(request, entry);
+    return appendTransaction(request, context, cacheEntry);
   }
 
   @Override
@@ -459,7 +503,7 @@ public class RaftServerImpl implements RaftServer {
       SetConfigurationRequest request) throws IOException {
     LOG.debug("{}: receive setConfiguration({})", getId(), request);
     lifeCycle.assertCurrentState(RUNNING);
-    CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
+    CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null);
     if (reply != null) {
       return reply;
     }
@@ -467,7 +511,7 @@ public class RaftServerImpl implements RaftServer {
     final RaftPeer[] peersInNewConf = request.getPeersInNewConf();
     final PendingRequest pending;
     synchronized (this) {
-      reply = checkLeaderState(request);
+      reply = checkLeaderState(request, null);
       if (reply != null) {
         return reply;
       }
@@ -799,11 +843,41 @@ public class RaftServerImpl implements RaftServer {
     }
   }
 
-  synchronized void replyPendingRequest(long logIndex,
+  /**
+   * The log has been submitted to the state machine. Use the future to update
+   * the pending requests and retry cache.
+   * @param logEntry the log entry that has been submitted to the state machine
+   * @param stateMachineFuture the future returned by the state machine
+   *                           from which we will get transaction result later
+   */
+  void replyPendingRequest(LogEntryProto logEntry,
       CompletableFuture<Message> stateMachineFuture) {
-    if (isLeader() && leaderState != null) { // is leader and is running
-      leaderState.replyPendingRequest(logIndex, stateMachineFuture);
-    }
+    // update the retry cache
+    final ClientId clientId = new ClientId(logEntry.getClientId().toByteArray());
+    final long callId = logEntry.getCallId();
+    final RaftPeerId serverId = getId();
+    final RetryCache.CacheEntry cacheEntry = retryCache.getOrCreateEntry(
+        clientId, logEntry.getCallId());
+    stateMachineFuture.whenComplete((reply, exception) -> {
+      final RaftClientReply r;
+      if (exception == null) {
+        r = new RaftClientReply(clientId, serverId, callId, true, reply, null);
+      } else {
+        // the exception is coming from the state machine. wrap it into the
+        // reply as a StateMachineException
+        final StateMachineException e = new StateMachineException(
+            getId().toString(), exception);
+        r = new RaftClientReply(clientId, serverId, callId, false, null, e);
+      }
+      // update retry cache
+      cacheEntry.updateResult(r);
+      // update pending request
+      synchronized (RaftServerImpl.this) {
+        if (isLeader() && leaderState != null) { // is leader and is running
+          leaderState.replyPendingRequest(logEntry.getIndex(), r);
+        }
+      }
+    });
   }
 
   TransactionContext getTransactionContext(long index) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
new file mode 100644
index 0000000..0372558
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.shaded.com.google.common.cache.Cache;
+import org.apache.ratis.shaded.com.google.common.cache.CacheBuilder;
+import org.apache.ratis.util.RaftUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class RetryCache implements Closeable {
+  static final Logger LOG = LoggerFactory.getLogger(RetryCache.class);
+  private static final int MIN_CAPACITY = 128;
+
+  static class CacheKey {
+    private final ClientId clientId;
+    private final long callId;
+
+    CacheKey(ClientId clientId, long callId) {
+      this.clientId = clientId;
+      this.callId = callId;
+    }
+
+    @Override
+    public int hashCode() {
+      return clientId.hashCode() ^ Long.hashCode(callId);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj instanceof CacheKey) {
+        CacheKey e = (CacheKey) obj;
+        return e.clientId.equals(clientId) && callId == e.callId;
+      }
+      return false;
+    }
+
+    @Override
+    public String toString() {
+      return clientId.toString() + ":" + this.callId;
+    }
+  }
+
+  /**
+   * CacheEntry is tracked using unique client ID and callId of the RPC request
+   */
+  @VisibleForTesting
+  public static class CacheEntry {
+    private final CacheKey key;
+    private final CompletableFuture<RaftClientReply> replyFuture =
+        new CompletableFuture<>();
+
+    /**
+     * "failed" means we failed to commit the request into the raft group, or
+     * the request did not get approved by the state machine before the raft
+     * replication. Not once the request gets committed by the raft group, this
+     * field is never true even if the state machine throws an exception when
+     * applying the transaction.
+     */
+    private volatile boolean failed = false;
+
+    CacheEntry(CacheKey key) {
+      this.key = key;
+    }
+
+    @Override
+    public String toString() {
+      return key + ":" + (isDone() ? "done" : "pending");
+    }
+
+    boolean isDone() {
+      return isFailed() || replyFuture.isDone();
+    }
+
+    void updateResult(RaftClientReply reply) {
+      assert !replyFuture.isDone() && !replyFuture.isCancelled();
+      replyFuture.complete(reply);
+    }
+
+    boolean isFailed() {
+      return failed || replyFuture.isCompletedExceptionally();
+    }
+
+    void failWithReply(RaftClientReply reply) {
+      failed = true;
+      replyFuture.complete(reply);
+    }
+
+    void failWithException(Throwable t) {
+      failed = true;
+      replyFuture.completeExceptionally(t);
+    }
+
+    CompletableFuture<RaftClientReply> getReplyFuture() {
+      return replyFuture;
+    }
+  }
+
+  static class CacheQueryResult {
+    private final CacheEntry entry;
+    private final boolean isRetry;
+
+    CacheQueryResult(CacheEntry entry, boolean isRetry) {
+      this.entry = entry;
+      this.isRetry = isRetry;
+    }
+
+    public CacheEntry getEntry() {
+      return entry;
+    }
+
+    public boolean isRetry() {
+      return isRetry;
+    }
+  }
+
+  private final Cache<CacheKey, CacheEntry> cache;
+
+  /**
+   * @param capacity the capacity of the cache
+   * @param expirationTime time for an entry to expire in milliseconds
+   */
+  RetryCache(int capacity, TimeDuration expirationTime) {
+    capacity = Math.max(capacity, MIN_CAPACITY);
+    cache = CacheBuilder.newBuilder().maximumSize(capacity)
+        .expireAfterWrite(expirationTime.toLong(TimeUnit.MILLISECONDS),
+            TimeUnit.MILLISECONDS).build();
+  }
+
+  CacheEntry getOrCreateEntry(ClientId clientId, long callId) {
+    final CacheKey key = new CacheKey(clientId, callId);
+    final CacheEntry entry;
+    try {
+      entry = cache.get(key, () -> new CacheEntry(key));
+    } catch (ExecutionException e) {
+      throw new IllegalStateException(e);
+    }
+    RaftUtils.assertTrue(entry != null && !entry.isDone(),
+        "retry cache entry should be pending: %s", entry);
+    return entry;
+  }
+
+  private CacheEntry refreshEntry(CacheEntry newEntry) {
+    cache.put(newEntry.key, newEntry);
+    return newEntry;
+  }
+
+  CacheQueryResult queryCache(ClientId clientId, long callId) {
+    CacheKey key = new CacheKey(clientId, callId);
+    final CacheEntry newEntry = new CacheEntry(key);
+    CacheEntry cacheEntry;
+    try {
+      cacheEntry = cache.get(key, () -> newEntry);
+    } catch (ExecutionException e) {
+      throw new IllegalStateException(e);
+    }
+
+    if (cacheEntry == newEntry) {
+      // this is the entry we just newly created
+      return new CacheQueryResult(cacheEntry, false);
+    } else if (!cacheEntry.isDone() || !cacheEntry.isFailed()){
+      // the previous attempt is either pending or successful
+      return new CacheQueryResult(cacheEntry, true);
+    }
+
+    // the previous attempt failed, replace it with a new one.
+    synchronized (this) {
+      // need to recheck, since there may be other retry attempts being
+      // processed at the same time. The recheck+replacement should be protected
+      // by lock.
+      CacheEntry currentEntry = cache.getIfPresent(key);
+      if (currentEntry == cacheEntry || currentEntry == null) {
+        // if the failed entry has not got replaced by another retry, or the
+        // failed entry got invalidated, we add a new cache entry
+        return new CacheQueryResult(refreshEntry(newEntry), false);
+      } else {
+        return new CacheQueryResult(currentEntry, true);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  long size() {
+    return cache.size();
+  }
+
+  @VisibleForTesting
+  CacheEntry get(ClientId clientId, long callId) {
+    return cache.getIfPresent(new CacheKey(clientId, callId));
+  }
+
+  @Override
+  public synchronized void close() {
+    if (cache != null) {
+      cache.invalidateAll();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index b4fc705..38436a2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -164,7 +164,7 @@ class StateMachineUpdater implements Runnable {
               // TODO: This step can be parallelized
               CompletableFuture<Message> stateMachineFuture =
                   stateMachine.applyTransaction(trx);
-              server.replyPendingRequest(next.getIndex(), stateMachineFuture);
+              server.replyPendingRequest(next, stateMachineFuture);
             }
             lastAppliedIndex++;
           } else {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java
new file mode 100644
index 0000000..66c54cb
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.ratis.MiniRaftCluster.PeerChanges;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static java.util.Arrays.asList;
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
+
+public abstract class RaftRetryCacheTests {
+  public static final Logger LOG = LoggerFactory.getLogger(RaftRetryCacheTests.class);
+
+  public static final int NUM_SERVERS = 3;
+  protected static final RaftProperties properties = new RaftProperties();
+
+  public abstract MiniRaftCluster getCluster();
+
+  public RaftProperties getProperties() {
+    return properties;
+  }
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(120 * 1000);
+
+  @Before
+  public void setup() throws IOException {
+    Assert.assertNull(getCluster().getLeader());
+    getCluster().start();
+  }
+
+  @After
+  public void tearDown() {
+    final MiniRaftCluster cluster = getCluster();
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * make sure the retry cache can correct capture the retry from a client,
+   * and returns the result from the previous request
+   */
+  @Test
+  public void testBasicRetry() throws Exception {
+    final MiniRaftCluster cluster = getCluster();
+    RaftTestUtil.waitForLeader(cluster);
+
+    final RaftPeerId leaderId = cluster.getLeader().getId();
+    RaftClient client = cluster.createClient(leaderId);
+    client.send(new RaftTestUtil.SimpleMessage("first msg to make leader ready"));
+    long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
+
+    final RaftClientRpc rpc = client.getClientRpc();
+    final long callId = 999;
+    RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
+        callId, new RaftTestUtil.SimpleMessage("message"));
+    RaftClientReply reply = rpc.sendRequest(r);
+    Assert.assertEquals(callId, reply.getCallId());
+    Assert.assertTrue(reply.isSuccess());
+
+    // retry with the same callId
+    for (int i = 0; i < 5; i++) {
+      reply = rpc.sendRequest(r);
+      Assert.assertEquals(client.getId(), reply.getClientId());
+      Assert.assertEquals(callId, reply.getCallId());
+      Assert.assertTrue(reply.isSuccess());
+    }
+
+    long leaderApplied = cluster.getLeader().getState().getLastAppliedIndex();
+    // make sure retry cache has the entry
+    for (RaftServerImpl server : cluster.getServers()) {
+      LOG.info("check server " + server.getId());
+      if (server.getState().getLastAppliedIndex() < leaderApplied) {
+        Thread.sleep(1000);
+      }
+      Assert.assertEquals(2, RaftServerTestUtil.getRetryCacheSize(server));
+      Assert.assertNotNull(
+          RaftServerTestUtil.getRetryEntry(server, client.getId(), callId));
+      // make sure there is only one log entry committed
+      Assert.assertEquals(oldLastApplied + 1,
+          server.getState().getLastAppliedIndex());
+    }
+  }
+
+  /**
+   * Test retry while the leader changes to another peer
+   */
+  @Test
+  public void testRetryOnNewLeader() throws Exception {
+    final MiniRaftCluster cluster = getCluster();
+    RaftTestUtil.waitForLeader(cluster);
+
+    final RaftPeerId leaderId = cluster.getLeader().getId();
+    RaftClient client = cluster.createClient(leaderId);
+    client.send(new RaftTestUtil.SimpleMessage("first msg to make leader ready"));
+
+    RaftClientRpc rpc = client.getClientRpc();
+    final long callId = 999;
+    RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
+        callId, new RaftTestUtil.SimpleMessage("message"));
+    RaftClientReply reply = rpc.sendRequest(r);
+    Assert.assertEquals(callId, reply.getCallId());
+    Assert.assertTrue(reply.isSuccess());
+    long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
+
+    // trigger the reconfiguration, make sure the original leader is kicked out
+    PeerChanges change = cluster.addNewPeers(2, true);
+    RaftPeer[] allPeers = cluster.removePeers(2, true,
+        asList(change.newPeers)).allPeersInNewConf;
+    // trigger setConfiguration
+    SetConfigurationRequest request = new SetConfigurationRequest(
+        client.getId(), cluster.getLeader().getId(), DEFAULT_CALLID, allPeers);
+    LOG.info("Start changing the configuration: {}", request);
+    cluster.getLeader().setConfiguration(request);
+
+    RaftTestUtil.waitForLeader(cluster);
+    final RaftPeerId newLeaderId = cluster.getLeader().getId();
+    Assert.assertNotEquals(leaderId, newLeaderId);
+    // same clientId and callId in the request
+    r = new RaftClientRequest(client.getId(), newLeaderId,
+        callId, new RaftTestUtil.SimpleMessage("message"));
+    for (int i = 0; i < 10; i++) {
+      try {
+        reply = rpc.sendRequest(r);
+        LOG.info("successfully sent out the retry request_" + i);
+        Assert.assertEquals(client.getId(), reply.getClientId());
+        Assert.assertEquals(callId, reply.getCallId());
+        Assert.assertTrue(reply.isSuccess());
+      } catch (Exception e) {
+        LOG.info("hit exception while retrying the same request: " + e);
+      }
+      Thread.sleep(100);
+    }
+
+    // check the new leader and make sure the retry did not get committed
+    Assert.assertEquals(oldLastApplied + 3,
+        cluster.getLeader().getState().getLastAppliedIndex());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 012cfd7..0762f21 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -19,6 +19,7 @@ package org.apache.ratis.server.impl;
 
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.junit.Assert;
 import org.slf4j.Logger;
@@ -69,4 +70,17 @@ public class RaftServerTestUtil {
       RaftConfiguration initialConf) {
     return new ConfigurationManager(initialConf);
   }
+
+  public static long getRetryCacheSize(RaftServerImpl server) {
+    return server.getRetryCache().size();
+  }
+
+  public static RetryCache.CacheEntry getRetryEntry(RaftServerImpl server,
+      ClientId clientId, long callId) {
+    return server.getRetryCache().get(clientId, callId);
+  }
+
+  public static boolean isRetryCacheEntryFailed(RetryCache.CacheEntry entry) {
+    return entry.isFailed();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java
new file mode 100644
index 0000000..5e896e2
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftRetryCacheTests;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.RaftUtils;
+
+import java.io.IOException;
+
+public class TestRetryCacheWithSimulatedRpc extends RaftRetryCacheTests {
+  static {
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  private final MiniRaftClusterWithSimulatedRpc cluster;
+
+  public TestRetryCacheWithSimulatedRpc() throws IOException {
+    cluster = MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster(
+        NUM_SERVERS, getProperties());
+  }
+
+  @Override
+  public MiniRaftClusterWithSimulatedRpc getCluster() {
+    return cluster;
+  }
+}