You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2017/12/05 00:56:17 UTC

incubator-ratis git commit: RATIS-162. Clean up reply exception handling code for RATIS-140.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 1a74e13ea -> 8f0a3ed50


RATIS-162. Clean up reply exception handling code for RATIS-140.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/8f0a3ed5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/8f0a3ed5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/8f0a3ed5

Branch: refs/heads/master
Commit: 8f0a3ed50aa7a347558551ab5ce3a8bc2e8e92a0
Parents: 1a74e13
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Tue Dec 5 08:55:56 2017 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Tue Dec 5 08:55:56 2017 +0800

----------------------------------------------------------------------
 .../org/apache/ratis/client/RaftClient.java     |  8 +-
 .../org/apache/ratis/client/RaftClientRpc.java  |  2 +-
 .../ratis/client/impl/ClientImplUtils.java      |  5 +-
 .../ratis/client/impl/ClientProtoUtils.java     |  9 ++-
 .../ratis/client/impl/RaftClientImpl.java       | 79 ++++++++++++--------
 .../client/impl/RaftClientRpcWithProxy.java     |  4 +-
 .../ratis/protocol/LeaderNotReadyException.java |  4 +-
 .../apache/ratis/protocol/RaftClientReply.java  | 40 +++++-----
 .../java/org/apache/ratis/protocol/RaftId.java  |  5 +-
 .../java/org/apache/ratis/util/JavaUtils.java   | 32 ++++++++
 .../java/org/apache/ratis/util/LogUtils.java    | 10 ++-
 .../ratis/util/UncheckedAutoCloseable.java      | 27 +++++++
 .../examples/filestore/FileStoreClient.java     |  6 +-
 .../TestRaftStateMachineException.java          |  5 +-
 .../org/apache/ratis/grpc/RaftGrpcUtil.java     | 12 +--
 .../ratis/grpc/client/AppendStreamer.java       |  7 +-
 .../apache/ratis/grpc/client/GrpcClientRpc.java | 45 +++++------
 .../grpc/client/RaftClientProtocolClient.java   | 22 ++++--
 .../grpc/client/RaftClientProtocolProxy.java    |  6 +-
 .../ratis/server/impl/PendingRequests.java      |  3 +-
 .../ratis/server/impl/RaftServerImpl.java       |  6 +-
 .../apache/ratis/server/impl/RetryCache.java    |  6 +-
 .../ratis/server/impl/ServerProtoUtils.java     |  2 +-
 .../apache/ratis/server/impl/ServerState.java   | 21 ++++--
 .../java/org/apache/ratis/MiniRaftCluster.java  |  9 +++
 .../org/apache/ratis/RaftExceptionBaseTest.java | 13 ++--
 .../impl/RaftReconfigurationBaseTest.java       |  2 +-
 .../server/simulation/SimulatedClientRpc.java   |  2 +-
 28 files changed, 242 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index d2c5c1a..89fb8f4 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -21,9 +21,8 @@ import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.impl.ClientImplUtils;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.rpc.RpcType;
-import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.rpc.RpcType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,7 +82,6 @@ public interface RaftClient extends Closeable {
     private RaftGroup group;
     private RaftPeerId leaderId;
     private RaftProperties properties;
-    private TimeDuration retryInterval = RaftClientConfigKeys.Rpc.TIMEOUT_DEFAULT;
     private Parameters parameters;
 
     private Builder() {}
@@ -94,8 +92,6 @@ public interface RaftClient extends Closeable {
         clientId = ClientId.randomId();
       }
       if (properties != null) {
-        retryInterval = RaftClientConfigKeys.Rpc.timeout(properties);
-
         if (clientRpc == null) {
           final RpcType rpcType = RaftConfigKeys.Rpc.type(properties);
           final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(parameters));
@@ -106,7 +102,7 @@ public interface RaftClient extends Closeable {
           Objects.requireNonNull(group, "The 'group' field is not initialized."),
           leaderId,
           Objects.requireNonNull(clientRpc, "The 'clientRpc' field is not initialized."),
-          retryInterval, properties);
+          properties);
     }
 
     /** Set {@link RaftClient} ID. */

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
index 51f4430..c6c2d61 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
@@ -40,5 +40,5 @@ public interface RaftClientRpc extends Closeable {
   void addServers(Iterable<RaftPeer> servers);
 
   /** Handle the given exception.  For example, try reconnecting. */
-  void handleException(RaftPeerId serverId, Exception e);
+  void handleException(RaftPeerId serverId, Exception e, boolean shouldClose);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
index e7c89f3..d813650 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
@@ -28,8 +28,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 /** Client utilities for internal use. */
 public class ClientImplUtils {
   public static RaftClient newRaftClient(ClientId clientId, RaftGroup group,
-      RaftPeerId leaderId, RaftClientRpc clientRpc, TimeDuration retryInterval,
-      RaftProperties properties) {
-    return new RaftClientImpl(clientId, group, leaderId, clientRpc, retryInterval, properties);
+      RaftPeerId leaderId, RaftClientRpc clientRpc, RaftProperties properties) {
+    return new RaftClientImpl(clientId, group, leaderId, clientRpc, properties);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 2d7c13e..43c06de 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -109,8 +109,10 @@ public class ClientProtoUtils {
       if (reply.getMessage() != null) {
         b.setMessage(toClientMessageEntryProtoBuilder(reply.getMessage()));
       }
-      if (reply.isNotLeader()) {
-        NotLeaderException nle = reply.getNotLeaderException();
+
+      final NotLeaderException nle = reply.getNotLeaderException();
+      final StateMachineException sme;
+      if (nle != null) {
         NotLeaderExceptionProto.Builder nleBuilder =
             NotLeaderExceptionProto.newBuilder();
         final RaftPeer suggestedLeader = nle.getSuggestedLeader();
@@ -120,8 +122,7 @@ public class ClientProtoUtils {
         nleBuilder.addAllPeersInConf(
             ProtoUtils.toRaftPeerProtos(Arrays.asList(nle.getPeers())));
         b.setNotLeaderException(nleBuilder.build());
-      } else if (reply.hasStateMachineException()) {
-        StateMachineException sme = reply.getStateMachineException();
+      } else if ((sme = reply.getStateMachineException()) != null) {
         StateMachineExceptionProto.Builder smeBuilder =
             StateMachineExceptionProto.newBuilder();
         final Throwable t = sme.getCause() != null ? sme.getCause() : sme;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 9c66a9f..ba1a107 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -32,6 +32,7 @@ import java.io.InterruptedIOException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
 
@@ -56,14 +57,15 @@ final class RaftClientImpl implements RaftClient {
   private final Semaphore asyncRequestSemaphore;
 
   RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId,
-      RaftClientRpc clientRpc, TimeDuration retryInterval, RaftProperties properties) {
+      RaftClientRpc clientRpc, RaftProperties properties) {
     this.clientId = clientId;
     this.clientRpc = clientRpc;
     this.peers = new ConcurrentLinkedQueue<>(group.getPeers());
     this.groupId = group.getGroupId();
     this.leaderId = leaderId != null? leaderId
         : !peers.isEmpty()? peers.iterator().next().getId(): null;
-    this.retryInterval = retryInterval;
+    this.retryInterval = RaftClientConfigKeys.Rpc.timeout(properties);
+
     asyncRequestSemaphore = new Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
     scheduler = Executors.newScheduledThreadPool(RaftClientConfigKeys.Async.schedulerThreads(properties));
     clientRpc.addServers(peers);
@@ -101,12 +103,8 @@ final class RaftClientImpl implements RaftClient {
     final long seqNum = nextSeqNum();
     return sendRequestWithRetryAsync(
         () -> new RaftClientRequest(clientId, leaderId, groupId, callId, seqNum, message, readOnly)
-    ).thenApply(reply -> {
-      if (reply.hasStateMachineException() || reply.hasGroupMismatchException()) {
-        throw new CompletionException(reply.getException());
-      }
-      return reply;
-    }).whenComplete((r, e) -> asyncRequestSemaphore.release());
+    ).thenApply(reply -> handleStateMachineException(reply, CompletionException::new)
+    ).whenComplete((r, e) -> asyncRequestSemaphore.release());
   }
 
   @Override
@@ -203,15 +201,12 @@ final class RaftClientImpl implements RaftClient {
 
   private CompletableFuture<RaftClientReply> sendRequestAsync(
       RaftClientRequest request) {
-    LOG.debug("{}: sendAsync {}", clientId, request);
+    LOG.debug("{}: send* {}", clientId, request);
     return clientRpc.sendRequestAsync(request).thenApply(reply -> {
-      LOG.debug("{}: receive {}", clientId, reply);
-      if (reply != null && reply.isNotLeader()) {
-        handleNotLeaderException(request, reply.getNotLeaderException());
-        return null;
-      }
-      return reply;
+      LOG.debug("{}: receive* {}", clientId, reply);
+      return handleNotLeaderException(request, reply);
     }).exceptionally(e -> {
+      LOG.debug("{}: Failed {} with {}", clientId, request, e);
       final Throwable cause = e.getCause();
       if (cause instanceof GroupMismatchException) {
         return new RaftClientReply(request, (RaftException) cause);
@@ -233,26 +228,40 @@ final class RaftClientImpl implements RaftClient {
     } catch (IOException ioe) {
       handleIOException(request, ioe, null);
     }
+    LOG.debug("{}: receive {}", clientId, reply);
+    reply = handleNotLeaderException(request, reply);
+    reply = handleStateMachineException(reply, Function.identity());
+    return reply;
+  }
+
+  static <E extends Throwable> RaftClientReply handleStateMachineException(
+      RaftClientReply reply, Function<StateMachineException, E> converter) throws E {
     if (reply != null) {
-      LOG.debug("{}: receive {}", clientId, reply);
-      if (reply.isNotLeader()) {
-        handleNotLeaderException(request, reply.getNotLeaderException());
-        return null;
-      } else if (reply.hasStateMachineException()) {
-        throw reply.getStateMachineException();
-      } else {
-        return reply;
+      final StateMachineException sme = reply.getStateMachineException();
+      if (sme != null) {
+        throw converter.apply(sme);
       }
     }
-    return null;
+    return reply;
   }
 
-  private void handleNotLeaderException(RaftClientRequest request,
-      NotLeaderException nle) {
+  /**
+   * @return null if the reply is null or it has {@link NotLeaderException};
+   *         otherwise return the same reply.
+   */
+  private RaftClientReply handleNotLeaderException(RaftClientRequest request, RaftClientReply reply) {
+    if (reply == null) {
+      return null;
+    }
+    final NotLeaderException nle = reply.getNotLeaderException();
+    if (nle == null) {
+      return reply;
+    }
     refreshPeers(Arrays.asList(nle.getPeers()));
     final RaftPeerId newLeader = nle.getSuggestedLeader() == null ? null
         : nle.getSuggestedLeader().getId();
     handleIOException(request, nle, newLeader);
+    return null;
   }
 
   private void refreshPeers(Collection<RaftPeer> newPeers) {
@@ -266,23 +275,29 @@ final class RaftClientImpl implements RaftClient {
 
   private void handleIOException(RaftClientRequest request, IOException ioe,
       RaftPeerId newLeader) {
-    LOG.debug("{}: suggested new leader: {}. Failed with {}", clientId,
-        newLeader, ioe);
+    LOG.debug("{}: suggested new leader: {}. Failed {} with {}",
+        clientId, newLeader, request, ioe);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Stack trace", new Throwable("TRACE"));
     }
 
-    final RaftPeerId oldLeader = request.getServerId();
-    clientRpc.handleException(oldLeader, ioe);
+    if (ioe instanceof LeaderNotReadyException) {
+      return;
+    }
 
-    if (newLeader == null && oldLeader.equals(leaderId)) {
+    final RaftPeerId oldLeader = request.getServerId();
+    final boolean stillLeader = oldLeader.equals(leaderId);
+    if (newLeader == null && stillLeader) {
       newLeader = CollectionUtils.random(oldLeader,
           CollectionUtils.as(peers, RaftPeer::getId));
     }
-    if (newLeader != null && oldLeader.equals(leaderId)) {
+
+    final boolean changeLeader = newLeader != null && stillLeader;
+    if (changeLeader) {
       LOG.debug("{}: change Leader from {} to {}", clientId, oldLeader, newLeader);
       this.leaderId = newLeader;
     }
+    clientRpc.handleException(oldLeader, ioe, changeLeader);
   }
 
   void assertAsyncRequestSemaphore(int expectedAvailablePermits, int expectedQueueLength) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
index 835a876..6f4f37f 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
@@ -48,8 +48,8 @@ public abstract class RaftClientRpcWithProxy<PROXY extends Closeable>
   }
 
   @Override
-  public void handleException(RaftPeerId serverId, Exception e) {
-    if (ReflectionUtils.isInstance(e,
+  public void handleException(RaftPeerId serverId, Exception e, boolean shouldClose) {
+    if (shouldClose || ReflectionUtils.isInstance(e,
         SocketException.class, SocketTimeoutException.class,
         ClosedChannelException.class, EOFException.class)) {
       proxies.resetProxy(serverId);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
index 33f6a4d..55af3f4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
@@ -24,8 +24,8 @@ package org.apache.ratis.protocol;
  * it cannot determine whether a request is just a retry.
  */
 public class LeaderNotReadyException extends RaftException {
-  public LeaderNotReadyException() {
-    this("The leader is not ready yet");
+  public LeaderNotReadyException(RaftPeerId id) {
+    this(id + " is in LEADER state but not ready yet.");
   }
 
   public LeaderNotReadyException(String msg) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index ea59352..77a987d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -17,6 +17,10 @@
  */
 package org.apache.ratis.protocol;
 
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReflectionUtils;
+
 /**
  * Reply from server to client
  */
@@ -40,6 +44,14 @@ public class RaftClientReply extends RaftClientMessage {
     this.callId = callId;
     this.message = message;
     this.exception = exception;
+
+    if (exception != null) {
+      Preconditions.assertTrue(!success,
+          () -> "Inconsistent parameters: success && exception != null: " + this);
+      Preconditions.assertTrue(
+          ReflectionUtils.isInstance(exception, NotLeaderException.class, StateMachineException.class),
+          () -> "Unexpected exception class: " + this);
+    }
   }
 
   public RaftClientReply(RaftClientRequest request,
@@ -64,8 +76,8 @@ public class RaftClientReply extends RaftClientMessage {
 
   @Override
   public String toString() {
-    return super.toString() + ", callId: " + getCallId()
-        + ", success: " + isSuccess();
+    return super.toString() + ", cid=" + getCallId()
+        + ", success? " + isSuccess() + ", exception=" + exception;
   }
 
   public boolean isSuccess() {
@@ -76,29 +88,13 @@ public class RaftClientReply extends RaftClientMessage {
     return message;
   }
 
-  public boolean isNotLeader() {
-    return exception instanceof NotLeaderException;
-  }
-
+  /** If this reply has {@link NotLeaderException}, return it; otherwise return null. */
   public NotLeaderException getNotLeaderException() {
-    assert isNotLeader();
-    return (NotLeaderException) exception;
+    return JavaUtils.cast(exception, NotLeaderException.class);
   }
 
+  /** If this reply has {@link StateMachineException}, return it; otherwise return null. */
   public StateMachineException getStateMachineException() {
-    assert hasStateMachineException();
-    return (StateMachineException) exception;
-  }
-
-  public boolean hasStateMachineException() {
-    return exception instanceof StateMachineException;
-  }
-
-  public boolean hasGroupMismatchException(){
-    return exception instanceof GroupMismatchException;
-  }
-
-  public RaftException getException(){
-    return exception;
+    return JavaUtils.cast(exception, StateMachineException.class);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
index 0846856..4b45765 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
@@ -67,8 +67,11 @@ public abstract class RaftId {
     this(toUuid(uuidBytes), () -> uuidBytes);
   }
 
+  /** @return the last 12 hex digits. */
   String createUuidString(UUID uuid) {
-    return uuid.toString().toUpperCase();
+    final String s = uuid.toString().toUpperCase();
+    final int i = s.lastIndexOf('-');
+    return s.substring(i + 1);
   }
 
   public ByteString toByteString() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 89407eb..e910f28 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -27,8 +27,12 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
 import java.util.Objects;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -40,6 +44,15 @@ public interface JavaUtils {
   Logger LOG = LoggerFactory.getLogger(JavaUtils.class);
 
   /**
+   * The same as {@link Class#cast(Object)} except that
+   * this method returns null (but not throw {@link ClassCastException})
+   * if the given object is not an instance of the given class.
+   */
+  static <T> T cast(Object obj, Class<T> clazz) {
+    return clazz.isInstance(obj)? clazz.cast(obj): null;
+  }
+
+  /**
    * Invoke {@link Callable#call()} and, if there any,
    * wrap the checked exception by {@link RuntimeException}.
    */
@@ -171,6 +184,20 @@ public interface JavaUtils {
     }
   }
 
+  Supplier<Timer> TIMER = memoize(() -> new Timer(true));
+
+  static UncheckedAutoCloseable runRepeatedly(Runnable runnable, long delay, long period, TimeUnit unit) {
+    final Timer timer = TIMER.get();
+    timer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        runnable.run();
+      }
+    }, unit.toMillis(delay), unit.toMillis(period));
+
+    return timer::cancel;
+  }
+
   static void dumpAllThreads(Consumer<String> println) {
     final ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
     for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) {
@@ -183,4 +210,9 @@ public interface JavaUtils {
     future.completeExceptionally(t);
     return future;
   }
+
+  static Throwable unwrapCompletionException(Throwable t) {
+    Objects.requireNonNull(t, "t == null");
+    return t instanceof CompletionException && t.getCause() != null? t.getCause(): t;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
index 6a4d833..e75b89b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
@@ -23,6 +23,7 @@ package org.apache.ratis.util;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
@@ -32,9 +33,16 @@ import java.util.function.Supplier;
  * Logging (as in log4j) related utility methods.
  */
 public interface LogUtils {
+  Logger LOG = LoggerFactory.getLogger(LogUtils.class);
 
   static void setLogLevel(Logger logger, Level level) {
-    LogManager.getLogger(logger.getName()).setLevel(level);
+    final String name = logger.getName();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("", new Throwable("Set " + name + " log level to " + level));
+    } else {
+      LOG.info("Set {} log level to {}", name, level);
+    }
+    LogManager.getLogger(name).setLevel(level);
   }
 
   static <THROWABLE extends Throwable> void runAndLog(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/util/UncheckedAutoCloseable.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/UncheckedAutoCloseable.java b/ratis-common/src/main/java/org/apache/ratis/util/UncheckedAutoCloseable.java
new file mode 100644
index 0000000..cc73159
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/UncheckedAutoCloseable.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+/**
+ * The same as {@link AutoCloseable}
+ * except that the close method does not throw {@link Exception}.
+ */
+public interface UncheckedAutoCloseable extends AutoCloseable {
+  @Override
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index 59d5079..380f315 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -22,6 +22,7 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.ratis.shaded.proto.ExamplesProtos.*;
 import org.apache.ratis.util.CheckedFunction;
@@ -57,8 +58,9 @@ public class FileStoreClient implements Closeable {
       ByteString request, CheckedFunction<Message, RaftClientReply, IOException> sendFunction)
       throws IOException {
     final RaftClientReply reply = sendFunction.apply(() -> request);
-    if (reply.hasStateMachineException()) {
-      throw new IOException("Failed to send request " + request, reply.getStateMachineException());
+    final StateMachineException sme = reply.getStateMachineException();
+    if (sme != null) {
+      throw new IOException("Failed to send request " + request, sme);
     }
     Preconditions.assertTrue(reply.isSuccess(), () -> "reply=" + reply);
     return reply.getMessage().getContent();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
index 158875d..b0e2b7c 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
@@ -35,6 +35,7 @@ import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
 import static org.junit.Assert.fail;
@@ -146,7 +147,7 @@ public class TestRaftStateMachineException extends ParameterizedBaseTest {
     RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
         cluster.getGroupId(), callId, seqNum, new SimpleMessage("message"));
     RaftClientReply reply = rpc.sendRequest(r);
-    Assert.assertTrue(reply.hasStateMachineException());
+    Objects.requireNonNull(reply.getStateMachineException());
 
     RetryCache.CacheEntry oldEntry = RaftServerTestUtil.getRetryEntry(
         cluster.getLeader(), client.getId(), callId);
@@ -155,7 +156,7 @@ public class TestRaftStateMachineException extends ParameterizedBaseTest {
 
     // retry
     reply = rpc.sendRequest(r);
-    Assert.assertTrue(reply.hasStateMachineException());
+    Objects.requireNonNull(reply.getStateMachineException());
 
     RetryCache.CacheEntry currentEntry = RaftServerTestUtil.getRetryEntry(
         cluster.getLeader(), client.getId(), callId);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
index d373ddb..5499878 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
@@ -22,10 +22,7 @@ import org.apache.ratis.shaded.io.grpc.Metadata;
 import org.apache.ratis.shaded.io.grpc.Status;
 import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.util.CheckedSupplier;
-import org.apache.ratis.util.IOUtils;
-import org.apache.ratis.util.ReflectionUtils;
-import org.apache.ratis.util.StringUtils;
+import org.apache.ratis.util.*;
 
 import java.io.IOException;
 import java.util.Objects;
@@ -38,12 +35,7 @@ public interface RaftGrpcUtil {
       Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
 
   static StatusRuntimeException wrapException(Throwable t) {
-    Objects.requireNonNull(t, "t == null");
-    if (t instanceof CompletionException) {
-      if (t.getCause() != null) {
-        t = t.getCause();
-      }
-    }
+    t = JavaUtils.unwrapCompletionException(t);
 
     Metadata trailers = new Metadata();
     trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName());

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
index 9c238f4..810121b 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -97,7 +97,7 @@ public class AppendStreamer implements Closeable {
     this.peers = group.getPeers().stream().collect(
         Collectors.toMap(RaftPeer::getId, Function.identity()));
     proxyMap = new PeerProxyMap<>(clientId.toString(),
-        raftPeer -> new RaftClientProtocolProxy(raftPeer, ResponseHandler::new));
+        raftPeer -> new RaftClientProtocolProxy(clientId, raftPeer, ResponseHandler::new));
     proxyMap.addPeers(group.getPeers());
     refreshLeaderProxy(leaderId, null);
 
@@ -277,10 +277,11 @@ public class AppendStreamer implements Closeable {
         } else {
           // this may be a NotLeaderException
           RaftClientReply r = ClientProtoUtils.toRaftClientReply(reply);
-          if (r.isNotLeader()) {
+          final NotLeaderException nle = r.getNotLeaderException();
+          if (nle != null) {
             LOG.debug("{} received a NotLeaderException from {}", this,
                 r.getServerId());
-            handleNotLeader(r.getNotLeaderException(), targetId);
+            handleNotLeader(nle, targetId);
           }
         }
         AppendStreamer.this.notifyAll();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 2b7de70..c5c188e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -23,13 +23,13 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.RaftGrpcUtil;
 import org.apache.ratis.protocol.*;
-import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
 import org.apache.ratis.shaded.proto.RaftProtos;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
 import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.PeerProxyMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,10 +43,13 @@ import static org.apache.ratis.client.impl.ClientProtoUtils.*;
 
 public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClient> {
   public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class);
+
+  private final ClientId clientId;
   private final int maxMessageSize;
 
   public GrpcClientRpc(ClientId clientId, RaftProperties properties) {
-    super(new PeerProxyMap<>(clientId.toString(), RaftClientProtocolClient::new));
+    super(new PeerProxyMap<>(clientId.toString(), p -> new RaftClientProtocolClient(clientId, p)));
+    this.clientId = clientId;
     maxMessageSize = GrpcConfigKeys.messageSizeMax(properties).getSizeInt();
   }
 
@@ -57,10 +60,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
     try {
       return sendRequestAsync(request, getProxies().getProxy(serverId));
     } catch (IOException e) {
-      final CompletableFuture<RaftClientReply> replyFuture =
-          new CompletableFuture<>();
-      replyFuture.completeExceptionally(e);
-      return replyFuture;
+      return JavaUtils.completeExceptionally(e);
     }
   }
 
@@ -83,16 +83,10 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
       return ClientProtoUtils.toServerInformationReply(
           proxy.serverInformation(proto));
     } else {
-      RaftClientRequestProto requestProto = toRaftClientRequestProto(request);
-      if (requestProto.getSerializedSize() > maxMessageSize) {
-        throw new IOException("msg size:" + requestProto.getSerializedSize() +
-            " exceeds maximum:" + maxMessageSize);
-      }
-      final CompletableFuture<RaftClientReply> replyFuture =
-                     sendRequestAsync(request, proxy);
+      final CompletableFuture<RaftClientReply> f = sendRequestAsync(request, proxy);
       // TODO: timeout support
       try {
-        return replyFuture.get();
+        return f.get();
       } catch (InterruptedException e) {
         throw new InterruptedIOException(
             "Interrupted while waiting for response of request " + request);
@@ -103,7 +97,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
   }
 
   private CompletableFuture<RaftClientReply> sendRequestAsync(
-      RaftClientRequest request, RaftClientProtocolClient proxy) {
+      RaftClientRequest request, RaftClientProtocolClient proxy) throws IOException {
     final RaftClientRequestProto requestProto =
         toRaftClientRequestProto(request);
     final CompletableFuture<RaftClientReplyProto> replyFuture =
@@ -117,22 +111,14 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
 
           @Override
           public void onError(Throwable t) {
-            // This implementation is used as RaftClientRpc. Retry
-            // logic on Exception is in RaftClient.
-            final IOException e;
-            if (t instanceof StatusRuntimeException) {
-              e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t);
-            } else {
-              e = IOUtils.asIOException(t);
-            }
-            replyFuture.completeExceptionally(e);
+            replyFuture.completeExceptionally(RaftGrpcUtil.unwrapIOException(t));
           }
 
           @Override
           public void onCompleted() {
             if (!replyFuture.isDone()) {
               replyFuture.completeExceptionally(
-                  new IOException("No reply for request " + request));
+                  new IOException(clientId + ": Stream completed but no reply for request " + request));
             }
           }
         });
@@ -141,4 +127,13 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
 
     return replyFuture.thenApply(replyProto -> toRaftClientReply(replyProto));
   }
+
+  RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest request) throws IOException {
+    final RaftClientRequestProto proto = ClientProtoUtils.toRaftClientRequestProto(request);
+    if (proto.getSerializedSize() > maxMessageSize) {
+      throw new IOException(clientId + ": Message size:" + proto.getSerializedSize()
+          + " exceeds maximum:" + maxMessageSize);
+    }
+    return proto;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
index 12a3a5b..ace90f2 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
@@ -18,35 +18,39 @@
 package org.apache.ratis.grpc.client;
 
 import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.shaded.io.grpc.ManagedChannel;
 import org.apache.ratis.shaded.io.grpc.ManagedChannelBuilder;
 import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc;
 import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceBlockingStub;
 import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc;
 import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub;
 import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub;
 import org.apache.ratis.util.CheckedSupplier;
+import org.apache.ratis.util.JavaUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.function.Supplier;
 
 public class RaftClientProtocolClient implements Closeable {
+  public static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolClient.class);
+
+  private final Supplier<String> name;
   private final RaftPeer target;
   private final ManagedChannel channel;
   private final RaftClientProtocolServiceBlockingStub blockingStub;
   private final RaftClientProtocolServiceStub asyncStub;
   private final AdminProtocolServiceBlockingStub adminBlockingStub;
 
-  public RaftClientProtocolClient(RaftPeer target) {
+  public RaftClientProtocolClient(ClientId id, RaftPeer target) {
+    this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
     this.target = target;
     channel = ManagedChannelBuilder.forTarget(target.getAddress())
         .usePlaintext(true).build();
@@ -55,6 +59,10 @@ public class RaftClientProtocolClient implements Closeable {
     adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel);
   }
 
+  String getName() {
+    return name.get();
+  }
+
   @Override
   public void close() {
     channel.shutdownNow();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
index 6892c71..297fe26 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.grpc.client;
 
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
@@ -31,9 +32,10 @@ public class RaftClientProtocolProxy implements Closeable {
   private final Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation;
   private RpcSession currentSession;
 
-  public RaftClientProtocolProxy(RaftPeer target,
+  public RaftClientProtocolProxy(
+      ClientId clientId, RaftPeer target,
       Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation) {
-    proxy = new RaftClientProtocolClient(target);
+    proxy = new RaftClientProtocolClient(clientId, target);
     this.responseHandlerCreation = responseHandlerCreation;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index f651230..1dead9d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -21,6 +21,7 @@ import org.apache.ratis.protocol.*;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -29,7 +30,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
 class PendingRequests {
-  private static final Logger LOG = RaftServerImpl.LOG;
+  public static final Logger LOG = LoggerFactory.getLogger(PendingRequests.class);
 
   private static class RequestMap {
     private final Object name;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 122ff51..bc45b5f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -367,7 +367,7 @@ public class RaftServerImpl implements RaftServerProtocol,
       return RetryCache.failWithReply(reply, entry);
     } else {
       if (leaderState == null || !leaderState.isReady()) {
-        return RetryCache.failWithException(new LeaderNotReadyException(), entry);
+        return RetryCache.failWithException(new LeaderNotReadyException(getId()), entry);
       }
     }
     return null;
@@ -732,8 +732,8 @@ public class RaftServerImpl implements RaftServerProtocol,
         final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
             leaderId, getId(), groupId, currentTerm, nextIndex, NOT_LEADER);
         if (LOG.isDebugEnabled()) {
-          LOG.debug("{}: do not recognize leader. Reply: {}",
-              getId(), ProtoUtils.toString(reply));
+          LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} reply: {}",
+              getId(), leaderId, leaderTerm, state, ProtoUtils.toString(reply));
         }
         return reply;
       }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
index afe2a7e..5abfc1a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
@@ -27,6 +27,7 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.shaded.com.google.common.cache.Cache;
 import org.apache.ratis.shaded.com.google.common.cache.CacheBuilder;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
@@ -243,10 +244,7 @@ public class RetryCache implements Closeable {
       entry.failWithException(t);
       return entry.getReplyFuture();
     } else {
-      final CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
-      future.completeExceptionally(t);
-      return future;
+      return JavaUtils.completeExceptionally(t);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 9f84e05..3e44c76 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -64,7 +64,7 @@ public class ServerProtoUtils {
     final ByteString clientId = entry.getClientId();
     return toTermIndexString(entry) + entry.getLogEntryBodyCase()
         + ", " + (clientId.isEmpty()? "<empty clientId>": ClientId.valueOf(clientId))
-        + ", callId=" + entry.getCallId();
+        + ", cid=" + entry.getCallId();
   }
 
   public static String toString(LogEntryProto... entries) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index e83a931..a51d4b9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -35,8 +35,10 @@ import org.apache.ratis.util.ProtoUtils;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.util.Objects;
 import java.util.function.Consumer;
 
+import static org.apache.ratis.server.impl.RaftServerImpl.LOG;
 import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
 
 /**
@@ -169,7 +171,7 @@ public class ServerState implements Closeable {
     if (newTerm > currentTerm) {
       currentTerm = newTerm;
       votedFor = null;
-      leaderId = null;
+      setLeader(null);
       return true;
     }
     return false;
@@ -188,7 +190,7 @@ public class ServerState implements Closeable {
    */
   long initElection() {
     votedFor = selfId;
-    leaderId = null;
+    setLeader(null);
     return ++currentTerm;
   }
 
@@ -201,15 +203,18 @@ public class ServerState implements Closeable {
    */
   void grantVote(RaftPeerId candidateId) {
     votedFor = candidateId;
-    leaderId = null;
+    setLeader(null);
   }
 
-  void setLeader(RaftPeerId leaderId) {
-    this.leaderId = leaderId;
+  void setLeader(RaftPeerId newLeaderId) {
+    if (!Objects.equals(leaderId, newLeaderId)) {
+      LOG.info("{}: change Leader from {} to {}", selfId, leaderId, newLeaderId);
+      leaderId = newLeaderId;
+    }
   }
 
   void becomeLeader() {
-    leaderId = selfId;
+    setLeader(selfId);
   }
 
   public RaftLog getLog() {
@@ -280,7 +285,7 @@ public class ServerState implements Closeable {
 
   public void setRaftConf(long logIndex, RaftConfiguration conf) {
     configurationManager.addConfiguration(logIndex, conf);
-    RaftServerImpl.LOG.info("{}: successfully update the configuration {}",
+    LOG.info("{}: successfully update the configuration {}",
         getSelfId(), conf);
   }
 
@@ -313,7 +318,7 @@ public class ServerState implements Closeable {
   @Override
   public void close() throws IOException {
     stateMachineUpdater.stop();
-    RaftServerImpl.LOG.info("{} closes. The last applied log index is {}",
+    LOG.info("{} closes. The last applied log index is {}",
         getSelfId(), getLastAppliedIndex());
 
     log.close();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 88fddf4..b50d847 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -145,6 +145,7 @@ public abstract class MiniRaftCluster {
     this.properties = new RaftProperties(properties);
     this.parameters = parameters;
 
+    JavaUtils.runRepeatedly(() -> LOG.info("TIMED-PRINT" + printServers()), 10, 10, TimeUnit.SECONDS);
     ExitUtils.disableSystemExit();
   }
 
@@ -470,6 +471,14 @@ public abstract class MiniRaftCluster {
     return createClient(null, g);
   }
 
+  public RaftClient createClientWithLeader() {
+    return createClient(getLeader().getId(), group);
+  }
+
+  public RaftClient createClientWithFollower() {
+    return createClient(getFollowers().get(0).getId(), group);
+  }
+
   public RaftClient createClient(RaftPeerId leaderId) {
     return createClient(leaderId, group);
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 4672b9d..7ad5bd2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -34,6 +34,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Objects;
 
 public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
     extends BaseTest
@@ -103,9 +104,9 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
     }
     Assert.assertNotNull(reply);
     Assert.assertFalse(reply.isSuccess());
-    Assert.assertTrue(reply.isNotLeader());
-    Assert.assertEquals(newLeader,
-        reply.getNotLeaderException().getSuggestedLeader().getId());
+    final NotLeaderException nle = reply.getNotLeaderException();
+    Objects.requireNonNull(nle);
+    Assert.assertEquals(newLeader, nle.getSuggestedLeader().getId());
 
     reply = client.send(new SimpleMessage("m3"));
     Assert.assertTrue(reply.isSuccess());
@@ -148,9 +149,9 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
     }
     Assert.assertNotNull(reply);
     Assert.assertFalse(reply.isSuccess());
-    Assert.assertTrue(reply.isNotLeader());
-    Assert.assertEquals(newLeader,
-        reply.getNotLeaderException().getSuggestedLeader().getId());
+    final NotLeaderException nle = reply.getNotLeaderException();
+    Objects.requireNonNull(nle);
+    Assert.assertEquals(newLeader, nle.getSuggestedLeader().getId());
     Collection<RaftPeer> peers = cluster.getPeers();
     RaftPeer[] peersFromReply = reply.getNotLeaderException().getPeers();
     Assert.assertEquals(peers.size(), peersFromReply.length);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 1fccfc4..9b142fd 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -532,7 +532,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
           final RaftClientRpc sender = client.getClientRpc();
           RaftClientReply reply = sender.sendRequest(cluster.newSetConfigurationRequest(
               client.getId(), leaderId, change.allPeersInNewConf));
-          if (reply.isNotLeader()) {
+          if (reply.getNotLeaderException() != null) {
             gotNotLeader.set(true);
           }
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
index 386d2e7..a141789 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
@@ -36,7 +36,7 @@ class SimulatedClientRpc
   }
 
   @Override
-  public void handleException(RaftPeerId serverId, Exception e) {
+  public void handleException(RaftPeerId serverId, Exception e, boolean shouldClose) {
     // do nothing
   }