You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/27 15:57:09 UTC

[incubator-ratis] branch master updated: RATIS-1184. Move RaftServerMetrics to org.apache.ratis.server.metrics. (#303)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c3a1d56  RATIS-1184. Move RaftServerMetrics to org.apache.ratis.server.metrics. (#303)
c3a1d56 is described below

commit c3a1d5667c6943fef25ff3bbc937aee30f44a5af
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Nov 27 23:56:51 2020 +0800

    RATIS-1184. Move RaftServerMetrics to org.apache.ratis.server.metrics. (#303)
    
    * RATIS-1184. Move RaftServerMetrics to org.apache.ratis.server.metrics.
    
    * Fix checkstyle
    
    * Remove unused import
---
 .../org/apache/ratis/server/impl/LeaderState.java  |  10 +-
 .../org/apache/ratis/server/impl/LogAppender.java  |   4 +-
 .../apache/ratis/server/impl/PendingRequests.java  |   1 +
 .../apache/ratis/server/impl/RaftServerImpl.java   |  29 ++---
 .../{impl => metrics}/RaftServerMetrics.java       | 137 +++++++++++----------
 .../java/org/apache/ratis/LogAppenderTests.java    |  21 ++--
 .../java/org/apache/ratis/MiniRaftCluster.java     |  30 +++--
 .../test/java/org/apache/ratis/RaftBasicTests.java |  85 ++++++-------
 .../test/java/org/apache/ratis/RaftTestUtil.java   |   5 +-
 .../ratis/server/impl/RaftServerTestUtil.java      |  15 +++
 .../server/impl/TestRatisServerMetricsBase.java    |   2 +-
 .../ratis/server/impl/TestRetryCacheMetrics.java   |  14 +--
 .../ratis/statemachine/RaftSnapshotBaseTest.java   |  29 +++--
 .../ratis/TestRaftServerSlownessDetection.java     |  12 +-
 .../apache/ratis/grpc/TestRaftServerWithGrpc.java  |  33 +++--
 .../ratis/grpc/TestRaftSnapshotWithGrpc.java       |   6 +-
 16 files changed, 221 insertions(+), 212 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index e161d6f..30eec01 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.exceptions.NotReplicatedException;
 import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.metrics.LogAppenderMetrics;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
@@ -454,7 +455,7 @@ public class LeaderState {
               server.getRpcSlownessTimeoutMs());
           LogAppender logAppender = server.newLogAppender(this, f);
           peerIdFollowerInfoMap.put(peer.getId(), f);
-          raftServerMetrics.addFollower(f.getPeer());
+          raftServerMetrics.addFollower(peer.getId());
           logAppenderMetrics.addFollowerGauges(f);
           return logAppender;
         }).collect(Collectors.toList());
@@ -988,12 +989,9 @@ public class LeaderState {
 
   /**
    * Record Follower Heartbeat Elapsed Time.
-   * @param follower RaftPeer.
-   * @param elapsedTime Elapsed time in Nanos.
    */
-  void recordFollowerHeartbeatElapsedTime(RaftPeer follower, long elapsedTime) {
-    raftServerMetrics.recordFollowerHeartbeatElapsedTime(follower,
-        elapsedTime);
+  void recordFollowerHeartbeatElapsedTime(RaftPeerId followerId, TimeDuration elapsedTime) {
+    raftServerMetrics.recordFollowerHeartbeatElapsedTime(followerId, elapsedTime.toLong(TimeUnit.NANOSECONDS));
   }
 
   @Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 6c1f80e..aac6c05 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -565,8 +565,8 @@ public class LogAppender {
     if (follower.isSlow()) {
       server.getStateMachine().leaderEvent().notifyFollowerSlowness(server.getRoleInfoProto());
     }
-    leaderState.recordFollowerHeartbeatElapsedTime(follower.getPeer(),
-        follower.getLastRpcResponseTime().elapsedTime().getDuration());
+    leaderState.recordFollowerHeartbeatElapsedTime(follower.getPeer().getId(),
+        follower.getLastRpcResponseTime().elapsedTime());
   }
 
   public synchronized void notifyAppend() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 5a3c493..9437a2f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -27,6 +27,7 @@ import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 691ed6e..a1dcc36 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -36,6 +36,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerMXBean;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.metrics.LeaderElectionMetrics;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
 import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -140,7 +141,8 @@ public class RaftServerImpl implements RaftServer.Division,
 
     this.jmxAdapter = new RaftServerJmxAdapter();
     this.leaderElectionMetrics = LeaderElectionMetrics.getLeaderElectionMetrics(this);
-    this.raftServerMetrics = RaftServerMetrics.getRaftServerMetrics(this);
+    this.raftServerMetrics = RaftServerMetrics.computeIfAbsentRaftServerMetrics(
+        getMemberId(), () -> commitInfoCache::get, () -> retryCache);
 
     this.startComplete = new AtomicBoolean(false);
   }
@@ -356,7 +358,7 @@ public class RaftServerImpl implements RaftServer.Division,
       try {
         leaderElectionMetrics.unregister();
         raftServerMetrics.unregister();
-        RaftServerMetrics.removeRaftServerMetrics(this);
+        RaftServerMetrics.removeRaftServerMetrics(getMemberId());
       } catch (Exception ignored) {
         LOG.warn("{}: Failed to unregister metric", getMemberId(), ignored);
       }
@@ -672,8 +674,7 @@ public class RaftServerImpl implements RaftServer.Division,
       RaftClientRequest request) throws IOException {
     assertLifeCycleState(LifeCycle.States.RUNNING);
     LOG.debug("{}: receive client request({})", getMemberId(), request);
-    Timer timer = raftServerMetrics.getClientRequestTimer(request);
-    final Timer.Context timerContext = (timer != null) ? timer.time() : null;
+    final Optional<Timer> timer = Optional.ofNullable(raftServerMetrics.getClientRequestTimer(request.getType()));
 
     CompletableFuture<RaftClientReply> replyFuture;
 
@@ -735,30 +736,16 @@ public class RaftServerImpl implements RaftServer.Division,
 
     final RaftClientRequest.Type type = request.getType();
     replyFuture.whenComplete((clientReply, exception) -> {
-      if (clientReply.isSuccess() && timerContext != null) {
-        timerContext.stop();
+      if (clientReply.isSuccess()) {
+        timer.map(Timer::time).ifPresent(Timer.Context::stop);
       }
       if (exception != null || clientReply.getException() != null) {
-        incFailedRequestCount(type);
+        raftServerMetrics.incFailedRequestCount(type);
       }
     });
     return replyFuture;
   }
 
-  private void incFailedRequestCount(RaftClientRequest.Type type) {
-    if (type.is(TypeCase.STALEREAD)) {
-      raftServerMetrics.onFailedClientStaleRead();
-    } else if (type.is(TypeCase.WATCH)) {
-      raftServerMetrics.onFailedClientWatch();
-    } else if (type.is(TypeCase.WRITE)) {
-      raftServerMetrics.onFailedClientWrite();
-    } else if (type.is(TypeCase.READ)) {
-      raftServerMetrics.onFailedClientRead();
-    } else if (type.is(TypeCase.MESSAGESTREAM)) {
-      raftServerMetrics.onFailedClientStream();
-    }
-  }
-
   private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest request) {
     return role.getLeaderState()
         .map(ls -> ls.addWatchReqeust(request))
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetrics.java
similarity index 63%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
rename to ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetrics.java
index 3b24010..4be970a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetrics.java
@@ -16,14 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.ratis.server.impl;
+package org.apache.ratis.server.metrics;
 
 import static org.apache.ratis.server.metrics.RaftLogMetrics.FOLLOWER_APPEND_ENTRIES_LATENCY;
 
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedMap;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
@@ -31,12 +34,13 @@ import com.codahale.metrics.Timer;
 
 import org.apache.ratis.metrics.MetricRegistryInfo;
 import org.apache.ratis.metrics.RatisMetricRegistry;
-import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftClientRequest.Type;
+import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.metrics.RatisMetrics;
+import org.apache.ratis.server.impl.RetryCache;
 import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.util.Preconditions;
 
@@ -76,30 +80,36 @@ public final class RaftServerMetrics extends RatisMetrics {
   public static final String RATIS_SERVER_FAILED_CLIENT_STREAM_COUNT =
       "numFailedClientStreamOnServer";
 
-  private Map<String, Long> followerLastHeartbeatElapsedTimeMap = new HashMap<>();
-  private CommitInfoCache commitInfoCache;
+  /** Follower Id -> heartbeat elapsed */
+  private final Map<RaftPeerId, Long> followerLastHeartbeatElapsedTimeMap = new HashMap<>();
+  private final Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache;
 
-  private static Map<String, RaftServerMetrics> metricsMap = new ConcurrentHashMap<>();
+  /** id -> metric */
+  private static final Map<RaftGroupMemberId, RaftServerMetrics> METRICS = new ConcurrentHashMap<>();
+  /** id -> key */
+  private static final Map<RaftPeerId, String> PEER_COMMIT_INDEX_GAUGE_KEYS = new ConcurrentHashMap<>();
 
-  public static RaftServerMetrics getRaftServerMetrics(
-      RaftServerImpl raftServer) {
-    RaftServerMetrics serverMetrics = new RaftServerMetrics(raftServer);
-    metricsMap.put(raftServer.getMemberId().toString(), serverMetrics);
-    return serverMetrics;
+  private static String getPeerCommitIndexGaugeKey(RaftPeerId serverId) {
+    return PEER_COMMIT_INDEX_GAUGE_KEYS.computeIfAbsent(serverId,
+        key -> String.format(LEADER_METRIC_PEER_COMMIT_INDEX, key));
   }
 
-  public static void removeRaftServerMetrics(RaftServerImpl raftServer) {
-    String memberId = raftServer.getMemberId().toString();
-    if (metricsMap.containsKey(memberId)) {
-      metricsMap.remove(memberId);
-    }
+  public static RaftServerMetrics computeIfAbsentRaftServerMetrics(RaftGroupMemberId serverId,
+      Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache, Supplier<RetryCache> retryCache) {
+    return METRICS.computeIfAbsent(serverId,
+        key -> new RaftServerMetrics(serverId, commitInfoCache, retryCache));
+  }
+
+  public static void removeRaftServerMetrics(RaftGroupMemberId serverId) {
+    METRICS.remove(serverId);
   }
 
-  private RaftServerMetrics(RaftServerImpl server) {
-    registry = getMetricRegistryForRaftServer(server.getMemberId().toString());
-    commitInfoCache = server.getCommitInfoCache();
-    addPeerCommitIndexGauge(server.getId());
-    addRetryCacheMetric(server);
+  public RaftServerMetrics(RaftGroupMemberId serverId,
+      Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache, Supplier<RetryCache> retryCache) {
+    this.registry = getMetricRegistryForRaftServer(serverId.toString());
+    this.commitInfoCache = commitInfoCache;
+    addPeerCommitIndexGauge(serverId.getPeerId());
+    addRetryCacheMetric(retryCache);
   }
 
   private RatisMetricRegistry getMetricRegistryForRaftServer(String serverId) {
@@ -108,20 +118,18 @@ public final class RaftServerMetrics extends RatisMetrics {
         RATIS_SERVER_METRICS_DESC));
   }
 
-  private void addRetryCacheMetric(RaftServerImpl raftServer) {
-    registry.gauge(RETRY_CACHE_ENTRY_COUNT_METRIC, () -> () -> raftServer.getRetryCache().size());
-    registry.gauge(RETRY_CACHE_HIT_COUNT_METRIC, () -> () -> raftServer.getRetryCache().stats().hitCount());
-    registry.gauge(RETRY_CACHE_HIT_RATE_METRIC, () -> () -> raftServer.getRetryCache().stats().hitRate());
-    registry.gauge(RETRY_CACHE_MISS_COUNT_METRIC, () -> () -> raftServer.getRetryCache().stats().missCount());
-    registry.gauge(RETRY_CACHE_MISS_RATE_METRIC, () -> () -> raftServer.getRetryCache().stats().missRate());
+  private void addRetryCacheMetric(Supplier<RetryCache> retryCache) {
+    registry.gauge(RETRY_CACHE_ENTRY_COUNT_METRIC, () -> () -> retryCache.get().size());
+    registry.gauge(RETRY_CACHE_HIT_COUNT_METRIC  , () -> () -> retryCache.get().stats().hitCount());
+    registry.gauge(RETRY_CACHE_HIT_RATE_METRIC   , () -> () -> retryCache.get().stats().hitRate());
+    registry.gauge(RETRY_CACHE_MISS_COUNT_METRIC , () -> () -> retryCache.get().stats().missCount());
+    registry.gauge(RETRY_CACHE_MISS_RATE_METRIC  , () -> () -> retryCache.get().stats().missRate());
   }
 
   /**
    * Register a follower with this Leader Metrics registry instance.
-   * @param peer {@Link RaftPeer} representing the follower
    */
-  public void addFollower(RaftPeer peer) {
-    String followerName = peer.getId().toString();
+  public void addFollower(RaftPeerId followerName) {
     String followerHbMetricKey = String.format(
         FOLLOWER_LAST_HEARTBEAT_ELAPSED_TIME_METRIC,
         followerName);
@@ -130,43 +138,32 @@ public final class RaftServerMetrics extends RatisMetrics {
     registry.gauge(followerHbMetricKey,
         () -> () -> followerLastHeartbeatElapsedTimeMap.get(followerName));
 
-    addPeerCommitIndexGauge(peer.getId());
+    addPeerCommitIndexGauge(followerName);
   }
 
   /**
    * Register a commit index tracker for the peer in cluster.
    */
   public void addPeerCommitIndexGauge(RaftPeerId peerId) {
-    String followerCommitIndexKey = String.format(
-        LEADER_METRIC_PEER_COMMIT_INDEX, peerId);
-    registry.gauge(followerCommitIndexKey, () -> () -> {
-      RaftProtos.CommitInfoProto commitInfoProto = commitInfoCache.get(peerId);
-      if (commitInfoProto != null) {
-        return commitInfoProto.getCommitIndex();
-      }
-      return 0L;
-    });
+    registry.gauge(getPeerCommitIndexGaugeKey(peerId), () -> () -> Optional.ofNullable(commitInfoCache.get())
+        .map(cache -> cache.apply(peerId))
+        .map(CommitInfoProto::getCommitIndex)
+        .orElse(0L));
   }
 
   /**
    * Get the commit index gauge for the given peer of the server
-   * @param server
-   * @param peerServer
    * @return Metric Gauge holding the value of commit index of the peer
    */
   @VisibleForTesting
-  public static Gauge getPeerCommitIndexGauge(RaftServerImpl server,
-      RaftServerImpl peerServer) {
+  public static Gauge getPeerCommitIndexGauge(RaftGroupMemberId serverId, RaftPeerId peerId) {
 
-    RaftServerMetrics serverMetrics =
-        metricsMap.get(server.getMemberId().toString());
+    final RaftServerMetrics serverMetrics = METRICS.get(serverId);
     if (serverMetrics == null) {
       return null;
     }
 
-    String followerCommitIndexKey = String.format(
-        LEADER_METRIC_PEER_COMMIT_INDEX,
-        peerServer.getPeer().getId().toString());
+    final String followerCommitIndexKey = getPeerCommitIndexGaugeKey(peerId);
 
     SortedMap<String, Gauge> map =
         serverMetrics.registry.getGauges((s, metric) ->
@@ -178,12 +175,11 @@ public final class RaftServerMetrics extends RatisMetrics {
 
   /**
    * Record heartbeat elapsed time for a follower within a Raft group.
-   * @param peer {@Link RaftPeer} representing the follower.
-   * @param elapsedTime Elapsed time in Nanos.
+   * @param followerId the follower id.
+   * @param elapsedTimeNs Elapsed time in Nanos.
    */
-  public void recordFollowerHeartbeatElapsedTime(RaftPeer peer, long elapsedTime) {
-    followerLastHeartbeatElapsedTimeMap.put(peer.getId().toString(),
-        elapsedTime);
+  public void recordFollowerHeartbeatElapsedTime(RaftPeerId followerId, long elapsedTimeNs) {
+    followerLastHeartbeatElapsedTimeMap.put(followerId, elapsedTimeNs);
   }
 
   public Timer getFollowerAppendEntryTimer(boolean isHeartbeat) {
@@ -198,13 +194,13 @@ public final class RaftServerMetrics extends RatisMetrics {
     return registry.counter(counterName);
   }
 
-  public Timer getClientRequestTimer(RaftClientRequest request) {
+  public Timer getClientRequestTimer(Type request) {
     if (request.is(TypeCase.READ)) {
       return getTimer(RAFT_CLIENT_READ_REQUEST);
     } else if (request.is(TypeCase.STALEREAD)) {
       return getTimer(RAFT_CLIENT_STALE_READ_REQUEST);
     } else if (request.is(TypeCase.WATCH)) {
-      String watchType = RaftClientRequest.Type.toString(request.getType().getWatch().getReplication());
+      String watchType = Type.toString(request.getWatch().getReplication());
       return getTimer(String.format(RAFT_CLIENT_WATCH_REQUEST, watchType));
     } else if (request.is(TypeCase.WRITE)) {
       return getTimer(RAFT_CLIENT_WRITE_REQUEST);
@@ -216,27 +212,27 @@ public final class RaftServerMetrics extends RatisMetrics {
     registry.counter(REQUEST_QUEUE_LIMIT_HIT_COUNTER).inc();
   }
 
-  void addNumPendingRequestsGauge(Gauge queueSize) {
+  public void addNumPendingRequestsGauge(Gauge queueSize) {
     registry.gauge(REQUEST_QUEUE_SIZE, () -> queueSize);
   }
 
-  boolean removeNumPendingRequestsGauge() {
+  public boolean removeNumPendingRequestsGauge() {
     return registry.remove(REQUEST_QUEUE_SIZE);
   }
 
-  void addNumPendingRequestsByteSize(Gauge byteSize) {
+  public void addNumPendingRequestsByteSize(Gauge byteSize) {
     registry.gauge(REQUEST_BYTE_SIZE, () -> byteSize);
   }
 
-  boolean removeNumPendingRequestsByteSize() {
+  public boolean removeNumPendingRequestsByteSize() {
     return registry.remove(REQUEST_BYTE_SIZE);
   }
 
-  void onRequestByteSizeLimitHit() {
+  public void onRequestByteSizeLimitHit() {
     registry.counter(REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER).inc();
   }
 
-  void onResourceLimitHit() {
+  public void onResourceLimitHit() {
     registry.counter(RESOURCE_LIMIT_HIT_COUNTER).inc();
   }
 
@@ -259,6 +255,21 @@ public final class RaftServerMetrics extends RatisMetrics {
   void onFailedClientStream() {
     registry.counter(RATIS_SERVER_FAILED_CLIENT_STREAM_COUNT).inc();
   }
+
+  public void incFailedRequestCount(Type type) {
+    if (type.is(TypeCase.STALEREAD)) {
+      onFailedClientStaleRead();
+    } else if (type.is(TypeCase.WATCH)) {
+      onFailedClientWatch();
+    } else if (type.is(TypeCase.WRITE)) {
+      onFailedClientWrite();
+    } else if (type.is(TypeCase.READ)) {
+      onFailedClientRead();
+    } else if (type.is(TypeCase.MESSAGESTREAM)) {
+      onFailedClientStream();
+    }
+  }
+
   public RatisMetricRegistry getRegistry() {
     return registry;
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index 1eafb7f..f710ed1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -28,12 +28,12 @@ import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.LogAppender;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerMetrics;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.impl.ServerProtoUtils;
-import org.apache.ratis.server.impl.ServerState;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
@@ -130,7 +130,7 @@ public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
     // Start a 3 node Ratis ring.
     final MiniRaftCluster cluster = newCluster(3);
     cluster.start();
-    RaftServerImpl leaderServer = waitForLeader(cluster);
+    final RaftServer.Division leaderServer = waitForLeader(cluster);
 
     // Write 10 messages to leader.
     try(RaftClient client = cluster.createClient(leaderServer.getId())) {
@@ -141,15 +141,14 @@ public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
       throw e;
     }
 
-    RatisMetricRegistry ratisMetricRegistry =
-        RaftServerMetrics.getRaftServerMetrics(leaderServer).getRegistry();
+    final RatisMetricRegistry ratisMetricRegistry = RaftServerTestUtil.getRaftServerMetrics(leaderServer).getRegistry();
 
     // Get all last_heartbeat_elapsed_time metric gauges. Should be equal to number of followers.
     SortedMap<String, Gauge> heartbeatElapsedTimeGauges = ratisMetricRegistry.getGauges((s, metric) ->
         s.contains("lastHeartbeatElapsedTime"));
     assertTrue(heartbeatElapsedTimeGauges.size() == 2);
 
-    for (RaftServerImpl followerServer : cluster.getFollowers()) {
+    for (RaftServer.Division followerServer : cluster.getFollowers()) {
       String followerId = followerServer.getId().toString();
       Gauge metric = heartbeatElapsedTimeGauges.entrySet().parallelStream().filter(e -> e.getKey().contains(
           followerId)).iterator().next().getValue();
@@ -158,7 +157,7 @@ public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
       // Metric in nanos > 0.
       assertTrue((long)metric.getValue() > 0);
       // Try to get Heartbeat metrics for follower.
-      RaftServerMetrics followerMetrics = RaftServerMetrics.getRaftServerMetrics(followerServer);
+      final RaftServerMetrics followerMetrics = RaftServerTestUtil.getRaftServerMetrics(followerServer);
       // Metric should not exist. It only exists in leader.
       assertTrue(followerMetrics.getRegistry().getGauges((s, m) -> s.contains("lastHeartbeatElapsedTime")).isEmpty());
       for (boolean heartbeat : new boolean[] { true, false }) {
@@ -208,8 +207,8 @@ public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
       }
     }
 
-    final ServerState leaderState = cluster.getLeader().getState();
-    final RaftLog leaderLog = leaderState.getLog();
+    final RaftServer.Division leader = cluster.getLeader();
+    final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(cluster.getLeader());
     final EnumMap<LogEntryBodyCase, AtomicLong> counts = RaftTestUtil.countEntries(leaderLog);
     LOG.info("counts = " + counts);
     Assert.assertEquals(6 * numMsgs * numClients, counts.get(LogEntryBodyCase.STATEMACHINELOGENTRY).get());
@@ -217,6 +216,6 @@ public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
     final LogEntryProto last = RaftTestUtil.getLastEntry(LogEntryBodyCase.STATEMACHINELOGENTRY, leaderLog);
     LOG.info("last = " + ServerProtoUtils.toLogEntryString(last));
     Assert.assertNotNull(last);
-    Assert.assertTrue(last.getIndex() <= leaderState.getLastAppliedIndex());
+    Assert.assertTrue(last.getIndex() <= RaftServerTestUtil.getLastAppliedIndex(leader));
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index c136add..c40d1d2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -505,8 +505,8 @@ public abstract class MiniRaftCluster implements Closeable {
     return b.toString();
   }
 
-  public RaftServerImpl getLeaderAndSendFirstMessage(boolean ignoreException) throws IOException {
-    final RaftServerImpl leader = getLeader();
+  public RaftServer.Division getLeaderAndSendFirstMessage(boolean ignoreException) throws IOException {
+    final RaftServer.Division leader = getLeader();
     try(RaftClient client = createClient(leader.getId())) {
       client.io().send(new RaftTestUtil.SimpleMessage("first msg to make leader ready"));
     } catch (IOException e) {
@@ -590,7 +590,7 @@ public abstract class MiniRaftCluster implements Closeable {
   }
 
   boolean isLeader(String leaderId) {
-    final RaftServerImpl leader = getLeader();
+    final RaftServer.Division leader = getLeader();
     return leader != null && leader.getId().toString().equals(leaderId);
   }
 
@@ -600,6 +600,12 @@ public abstract class MiniRaftCluster implements Closeable {
         .collect(Collectors.toList());
   }
 
+  public List<RaftServer.Division> getFollowerDivisions() {
+    return getServerAliveStream()
+        .filter(RaftServerImpl::isFollower)
+        .collect(Collectors.toList());
+  }
+
   public Collection<RaftServerProxy> getServers() {
     return servers.values();
   }
@@ -613,6 +619,10 @@ public abstract class MiniRaftCluster implements Closeable {
     return CollectionUtils.as(getServers(), this::getRaftServerImpl);
   }
 
+  public Iterable<RaftServer.Division> iterateDivisions() {
+    return CollectionUtils.as(getServers(), this::getRaftServerImpl);
+  }
+
   private Stream<RaftServerImpl> getServerStream(RaftGroupId groupId) {
     final Stream<RaftServerProxy> stream = getRaftServerProxyStream(groupId);
     return groupId != null?
@@ -636,22 +646,26 @@ public abstract class MiniRaftCluster implements Closeable {
     return servers.get(id);
   }
 
+  public RaftServer.Division getDivision(RaftPeerId id) {
+    return getRaftServerImpl(servers.get(id));
+  }
+
+  public RaftServer.Division getDivision(RaftPeerId id, RaftGroupId groupId) {
+    return RaftServerTestUtil.getRaftServerImpl(servers.get(id), groupId);
+  }
+
   public RaftServerImpl getRaftServerImpl(RaftPeerId id) {
     return getRaftServerImpl(servers.get(id));
   }
 
   public RaftServerImpl getRaftServerImpl(RaftPeerId id, RaftGroupId groupId) {
-    return getRaftServerImpl(servers.get(id), groupId);
+    return RaftServerTestUtil.getRaftServerImpl(servers.get(id), groupId);
   }
 
   public RaftServerImpl getRaftServerImpl(RaftServerProxy proxy) {
     return RaftServerTestUtil.getRaftServerImpl(proxy, getGroupId());
   }
 
-  public RaftServerImpl getRaftServerImpl(RaftServerProxy proxy, RaftGroupId groupId) {
-    return RaftServerTestUtil.getRaftServerImpl(proxy, groupId);
-  }
-
   public List<RaftPeer> getPeers() {
     return toRaftPeers(getServers());
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 3486f90..e183dce 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -18,8 +18,6 @@
 package org.apache.ratis;
 
 import com.codahale.metrics.Gauge;
-
-import java.util.Set;
 import org.apache.log4j.Level;
 import org.apache.ratis.RaftTestUtil.SimpleMessage;
 import org.apache.ratis.client.RaftClient;
@@ -27,14 +25,18 @@ import org.apache.ratis.client.impl.RaftClientTestUtil;
 import org.apache.ratis.metrics.MetricRegistries;
 import org.apache.ratis.metrics.MetricRegistryInfo;
 import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.*;
+import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.impl.RetryCacheTestUtil;
 import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Log4jUtils;
@@ -47,6 +49,7 @@ import org.slf4j.Logger;
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -60,11 +63,11 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
+import static org.apache.ratis.metrics.RatisMetrics.RATIS_APPLICATION_NAME_METRICS;
 import static org.apache.ratis.server.impl.StateMachineMetrics.RATIS_STATEMACHINE_METRICS;
 import static org.apache.ratis.server.impl.StateMachineMetrics.RATIS_STATEMACHINE_METRICS_DESC;
 import static org.apache.ratis.server.impl.StateMachineMetrics.STATEMACHINE_APPLIED_INDEX_GAUGE;
 import static org.apache.ratis.server.impl.StateMachineMetrics.STATEMACHINE_APPLY_COMPLETED_GAUGE;
-import static org.apache.ratis.metrics.RatisMetrics.RATIS_APPLICATION_NAME_METRICS;
 
 public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
     extends BaseTest
@@ -116,8 +119,8 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
     for (RaftServer s : cluster.getServers()) {
       cluster.restartServer(s.getId(), false);
     }
-    RaftServerImpl leader = waitForLeader(cluster);
-    final long term = leader.getState().getCurrentTerm();
+    RaftServer.Division leader = waitForLeader(cluster);
+    final long term = RaftServerTestUtil.getCurrentTerm(leader);
 
     final CompletableFuture<Void> killAndRestartFollower = killAndRestartServer(
         cluster.getFollowers().get(0).getId(), 0, 1000, cluster, LOG);
@@ -162,12 +165,11 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
     killAndRestartFollower.join();
     killAndRestartLeader.join();
 
-    for(RaftServerProxy server : cluster.getServers()) {
-      final RaftServerImpl impl = RaftServerTestUtil.getRaftServerImpl(server, cluster.getGroupId());
-      if (impl.isAlive()) {
+
+    final List<RaftServer.Division> divisions = cluster.getServerAliveStream().collect(Collectors.toList());
+    for(RaftServer.Division impl: divisions) {
         JavaUtils.attempt(() -> RaftTestUtil.assertLogEntries(impl, term, messages),
             5, TimeDuration.valueOf(1, TimeUnit.SECONDS), impl.getId() + " assertLogEntries", LOG);
-      }
     }
   }
 
@@ -178,23 +180,22 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
   }
 
   void runTestOldLeaderCommit(CLUSTER cluster) throws Exception {
-    final RaftServerImpl leader = waitForLeader(cluster);
+    final RaftServer.Division leader = waitForLeader(cluster);
     final RaftPeerId leaderId = leader.getId();
-    final long term = leader.getState().getCurrentTerm();
+    final long term = RaftServerTestUtil.getCurrentTerm(leader);
 
-    List<RaftServerImpl> followers = cluster.getFollowers();
-    final List<RaftServerImpl> followersToSendLog = followers.subList(0, followers.size() / 2);
+    final List<RaftServer.Division> followers = cluster.getFollowerDivisions();
+    final List<RaftServer.Division> followersToSendLog = followers.subList(0, followers.size() / 2);
     for (int i = followers.size() / 2; i < NUM_SERVERS - 1; i++) {
-      RaftServerImpl follower = followers.get(i);
-      cluster.killServer(follower.getId());
+      cluster.killServer(followers.get(i).getId());
     }
 
     SimpleMessage[] messages = SimpleMessage.create(1);
     RaftTestUtil.sendMessageInNewThread(cluster, leaderId, messages);
 
     Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) + 100);
-    for (RaftServerImpl followerToSendLog : followersToSendLog) {
-      RaftLog followerLog = followerToSendLog.getState().getLog();
+    for (RaftServer.Division followerToSendLog : followersToSendLog) {
+      RaftLog followerLog = RaftServerTestUtil.getRaftLog(followerToSendLog);
       Assert.assertTrue(RaftTestUtil.logEntriesContains(followerLog, messages));
     }
 
@@ -202,9 +203,9 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
     cluster.killServer(leaderId);
 
     for (int i = followers.size() / 2; i < NUM_SERVERS - 1; i++) {
-      RaftServerImpl follower = followers.get(i);
-      LOG.info(String.format("restarting follower: %s", follower.getId().toString()));
-      cluster.restartServer(follower.getId(), false);
+      final RaftPeerId followerId = followers.get(i).getId();
+      LOG.info(String.format("restarting follower: %s", followerId));
+      cluster.restartServer(followerId, false);
     }
 
     Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
@@ -227,12 +228,11 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
   void runTestOldLeaderNotCommit(CLUSTER cluster) throws Exception {
     final RaftPeerId leaderId = waitForLeader(cluster).getId();
 
-    List<RaftServerImpl> followers = cluster.getFollowers();
-    final RaftServerImpl followerToCommit = followers.get(0);
+    final List<RaftServer.Division> followers = cluster.getFollowerDivisions();
+    final RaftServer.Division followerToCommit = followers.get(0);
     try {
       for (int i = 1; i < NUM_SERVERS - 1; i++) {
-        RaftServerImpl follower = followers.get(i);
-        cluster.killServer(follower.getId());
+        cluster.killServer(followers.get(i).getId());
       }
     } catch (IndexOutOfBoundsException e) {
       throw new org.junit.AssumptionViolatedException("The assumption is follower.size() = NUM_SERVERS - 1, "
@@ -243,14 +243,13 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
     RaftTestUtil.sendMessageInNewThread(cluster, leaderId, messages);
 
     Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) + 100);
-    RaftTestUtil.logEntriesContains(followerToCommit.getState().getLog(), messages);
+    RaftTestUtil.logEntriesContains(RaftServerTestUtil.getRaftLog(followerToCommit), messages);
 
     cluster.killServer(leaderId);
     cluster.killServer(followerToCommit.getId());
 
     for (int i = 1; i < NUM_SERVERS - 1; i++) {
-      RaftServerImpl follower = followers.get(i);
-      cluster.restartServer(follower.getId(), false );
+      cluster.restartServer(followers.get(i).getId(), false );
     }
     waitForLeader(cluster);
     Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) + 100);
@@ -372,7 +371,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
         if (last != previousLastStep) {
           previousLastStep = last;
         } else {
-          final RaftServerImpl leader = cluster.getLeader();
+          final RaftServer.Division leader = cluster.getLeader();
           LOG.info("NO PROGRESS at " + last + ", try to restart leader=" + leader);
           if (leader != null) {
             try {
@@ -405,7 +404,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
       count++;
 
       try {
-        RaftServerImpl leader = cluster.getLeader();
+        RaftServer.Division leader = cluster.getLeader();
         if (leader != null) {
           RaftTestUtil.changeLeader(cluster, leader.getId());
         }
@@ -455,11 +454,8 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
 
   public static void testStateMachineMetrics(boolean async,
       MiniRaftCluster cluster, Logger LOG) throws Exception {
-    RaftServerImpl leader = waitForLeader(cluster);
+    RaftServer.Division leader = waitForLeader(cluster);
     try (final RaftClient client = cluster.createClient()) {
-
-      Assert.assertTrue(leader.isLeader());
-
       Gauge appliedIndexGauge = getStatemachineGaugeWithName(leader,
           STATEMACHINE_APPLIED_INDEX_GAUGE);
       Gauge smAppliedIndexGauge = getStatemachineGaugeWithName(leader,
@@ -488,27 +484,24 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
   }
 
   private static void checkFollowerCommitLagsLeader(MiniRaftCluster cluster) {
-    List<RaftServerImpl> followers = cluster.getFollowers();
-    RaftServerImpl leader = cluster.getLeader();
+    final List<RaftServer.Division> followers = cluster.getFollowerDivisions();
+    final RaftGroupMemberId leader = cluster.getLeader().getMemberId();
 
-    Gauge leaderCommitGauge = RaftServerMetrics
-        .getPeerCommitIndexGauge(leader, leader);
+    Gauge leaderCommitGauge = RaftServerMetrics.getPeerCommitIndexGauge(leader, leader.getPeerId());
 
-    for (RaftServerImpl follower : followers) {
-      Gauge followerCommitGauge = RaftServerMetrics
-          .getPeerCommitIndexGauge(leader, follower);
+    for (RaftServer.Division f : followers) {
+      final RaftGroupMemberId follower = f.getMemberId();
+      Gauge followerCommitGauge = RaftServerMetrics.getPeerCommitIndexGauge(leader, follower.getPeerId());
       Assert.assertTrue((Long)leaderCommitGauge.getValue() >=
           (Long)followerCommitGauge.getValue());
-      Gauge followerMetric = RaftServerMetrics
-          .getPeerCommitIndexGauge(follower, follower);
+      Gauge followerMetric = RaftServerMetrics.getPeerCommitIndexGauge(follower, follower.getPeerId());
       System.out.println(followerCommitGauge.getValue());
       System.out.println(followerMetric.getValue());
       Assert.assertTrue((Long)followerCommitGauge.getValue()  <= (Long)followerMetric.getValue());
     }
   }
 
-  private static Gauge getStatemachineGaugeWithName(RaftServerImpl server,
-      String gaugeName) {
+  private static Gauge getStatemachineGaugeWithName(RaftServer.Division server, String gaugeName) {
 
     MetricRegistryInfo info = new MetricRegistryInfo(server.getMemberId().toString(),
         RATIS_APPLICATION_NAME_METRICS,
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 6dd124b..118293e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -31,6 +31,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
 import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
 import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
@@ -232,9 +233,9 @@ public interface RaftTestUtil {
     }
   }
 
-  static void assertLogEntries(RaftServerImpl server, long expectedTerm, SimpleMessage... expectedMessages) {
+  static void assertLogEntries(RaftServer.Division server, long expectedTerm, SimpleMessage... expectedMessages) {
     LOG.info("checking raft log for {}", server.getMemberId());
-    final RaftLog log = server.getState().getLog();
+    final RaftLog log = RaftServerTestUtil.getRaftLog(server);
     try {
       RaftTestUtil.assertLogEntries(log, expectedTerm, expectedMessages);
     } catch (AssertionError e) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 56b7e60..f0e527f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -28,6 +28,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.DataStreamMap;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.util.JavaUtils;
@@ -107,6 +108,10 @@ public class RaftServerTestUtil {
     return ((RaftServerImpl)server).getState().getLastAppliedIndex();
   }
 
+  public static long getLatestInstalledSnapshotIndex(RaftServer.Division server) {
+    return ((RaftServerImpl)server).getState().getLatestInstalledSnapshotIndex();
+  }
+
   public static long getRetryCacheSize(RaftServer.Division server) {
     return ((RaftServerImpl)server).getRetryCache().size();
   }
@@ -168,4 +173,14 @@ public class RaftServerTestUtil {
   public static DataStreamMap newDataStreamMap(Object name) {
     return new DataStreamMapImpl(name);
   }
+
+  public static void shutdown(RaftServer.Division server) {
+    ((RaftServerImpl)server).shutdown();
+  }
+
+  public static void assertLostMajorityHeartbeatsRecently(RaftServer.Division leader) {
+    final FollowerState f = ((RaftServerImpl)leader).getRole().getFollowerState().orElse(null);
+    Assert.assertNotNull(f);
+    Assert.assertTrue(f.lostMajorityHeartbeatsRecently());
+  }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
index 9a3380b..7a1334d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
@@ -17,7 +17,7 @@
  */
 package org.apache.ratis.server.impl;
 
-import static org.apache.ratis.server.impl.RaftServerMetrics.RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
index 208f34a..777918c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
@@ -18,10 +18,8 @@
 
 package org.apache.ratis.server.impl;
 
-import static org.apache.ratis.server.impl.RaftServerMetrics.*;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.*;
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.protocol.ClientInvocationId;
@@ -29,6 +27,7 @@ import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
 import org.apache.ratis.util.TimeDuration;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -44,19 +43,14 @@ public class TestRetryCacheMetrics {
 
     @BeforeClass
     public static void setUp() {
-      RaftServerImpl raftServer = mock(RaftServerImpl.class);
-
       RaftGroupId raftGroupId = RaftGroupId.randomId();
       RaftPeerId raftPeerId = RaftPeerId.valueOf("TestId");
       RaftGroupMemberId raftGroupMemberId = RaftGroupMemberId
           .valueOf(raftPeerId, raftGroupId);
-      when(raftServer.getMemberId()).thenReturn(raftGroupMemberId);
-
       retryCache = new RetryCache(TimeDuration.valueOf(60, TimeUnit.SECONDS));
-      when(raftServer.getRetryCache()).thenReturn(retryCache);
 
-      RaftServerMetrics raftServerMetrics = RaftServerMetrics
-          .getRaftServerMetrics(raftServer);
+      final RaftServerMetrics raftServerMetrics = RaftServerMetrics.computeIfAbsentRaftServerMetrics(
+          raftGroupMemberId, () -> null, () -> retryCache);
       ratisMetricRegistry = raftServerMetrics.getRegistry();
     }
 
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 03a186b..89ab2da 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -37,7 +37,6 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.storage.RaftStorageDirectory;
@@ -76,9 +75,9 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
   private static final int SNAPSHOT_TRIGGER_THRESHOLD = 10;
 
   public static List<File> getSnapshotFiles(MiniRaftCluster cluster, long startIndex, long endIndex) {
-    final RaftServerImpl leader = cluster.getLeader();
+    final RaftServer.Division leader = cluster.getLeader();
     final SimpleStateMachineStorage storage = SimpleStateMachine4Testing.get(leader).getStateMachineStorage();
-    final long term = leader.getState().getCurrentTerm();
+    final long term = RaftServerTestUtil.getCurrentTerm(leader);
     return LongStream.range(startIndex, endIndex)
         .mapToObj(i -> storage.getSnapshotFile(term, i))
         .collect(Collectors.toList());
@@ -86,8 +85,8 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
 
 
   public static void assertLeaderContent(MiniRaftCluster cluster) throws Exception {
-    final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
-    final RaftLog leaderLog = leader.getState().getLog();
+    final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+    final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(leader);
     final long lastIndex = leaderLog.getLastEntryTermIndex().getIndex();
     final LogEntryProto e = leaderLog.get(lastIndex);
     Assert.assertTrue(e.hasMetadataEntry());
@@ -151,7 +150,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
       }
     }
 
-    final long nextIndex = cluster.getLeader().getState().getLog().getNextIndex();
+    final long nextIndex = RaftServerTestUtil.getRaftLog(cluster.getLeader()).getNextIndex();
     LOG.info("nextIndex = {}", nextIndex);
     // wait for the snapshot to be done
     final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
@@ -199,10 +198,9 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
       }
 
       // wait for the snapshot to be done
-      RaftStorageDirectory storageDirectory = cluster.getLeader().getState()
-          .getStorage().getStorageDir();
+      RaftStorageDirectory storageDirectory = RaftServerTestUtil.getRaftStorage(cluster.getLeader()).getStorageDir();
 
-      final long nextIndex = cluster.getLeader().getState().getLog().getNextIndex();
+      final long nextIndex = RaftServerTestUtil.getRaftLog(cluster.getLeader()).getNextIndex();
       LOG.info("nextIndex = {}", nextIndex);
       final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
       JavaUtils.attemptRepeatedly(() -> {
@@ -239,9 +237,9 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
       cluster.setConfiguration(change.allPeersInNewConf);
 
       for (String newPeer : newPeers) {
-        RaftServerImpl s = cluster.getRaftServerImpl(RaftPeerId.valueOf(newPeer));
+        final RaftServer.Division s = cluster.getDivision(RaftPeerId.valueOf(newPeer));
         SimpleStateMachine4Testing simpleStateMachine = SimpleStateMachine4Testing.get(s);
-        Assert.assertTrue(simpleStateMachine.getLifeCycleState() == LifeCycle.State.RUNNING);
+        Assert.assertSame(LifeCycle.State.RUNNING, simpleStateMachine.getLifeCycleState());
       }
 
       // Verify installSnapshot counter on leader before restart.
@@ -262,18 +260,19 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
     }
   }
 
-  protected void verifyInstallSnapshotMetric(RaftServerImpl leader) {
-    Counter installSnapshotCounter = leader.getRaftServerMetrics().getCounter(LOG_APPENDER_INSTALL_SNAPSHOT_METRIC);
+  protected void verifyInstallSnapshotMetric(RaftServer.Division leader) {
+    final Counter installSnapshotCounter = RaftServerTestUtil.getRaftServerMetrics(leader)
+        .getCounter(LOG_APPENDER_INSTALL_SNAPSHOT_METRIC);
     Assert.assertNotNull(installSnapshotCounter);
     Assert.assertTrue(installSnapshotCounter.getCount() >= 1);
   }
 
-  private static void verifyTakeSnapshotMetric(RaftServerImpl leader) {
+  private static void verifyTakeSnapshotMetric(RaftServer.Division leader) {
     Timer timer = getTakeSnapshotTimer(leader);
     Assert.assertTrue(timer.getCount() > 0);
   }
 
-  private static Timer getTakeSnapshotTimer(RaftServerImpl leader) {
+  private static Timer getTakeSnapshotTimer(RaftServer.Division leader) {
     MetricRegistryInfo info = new MetricRegistryInfo(leader.getMemberId().toString(),
         RATIS_APPLICATION_NAME_METRICS,
         RATIS_STATEMACHINE_METRICS, RATIS_STATEMACHINE_METRICS_DESC);
diff --git a/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java b/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
index 5be8c78..2888fe3 100644
--- a/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
+++ b/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
@@ -23,8 +23,7 @@ import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerMetrics;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
@@ -85,13 +84,12 @@ public class TestRaftServerSlownessDetection extends BaseTest {
 
   @Test
   public void testSlownessDetection() throws Exception {
-    RaftServerImpl leaderServer = RaftTestUtil.waitForLeader(cluster);
+    RaftServer.Division leaderServer = RaftTestUtil.waitForLeader(cluster);
     long slownessTimeout = RaftServerConfigKeys.Rpc
         .slownessTimeout(cluster.getProperties()).toIntExact(TimeUnit.MILLISECONDS);
-    RaftServerImpl failedFollower = cluster.getFollowers().get(0);
+    RaftServer.Division failedFollower = cluster.getFollowers().get(0);
 
-    RatisMetricRegistry ratisMetricRegistry =
-        RaftServerMetrics.getRaftServerMetrics(leaderServer).getRegistry();
+    final RatisMetricRegistry ratisMetricRegistry = RaftServerTestUtil.getRaftServerMetrics(leaderServer).getRegistry();
     SortedMap<String, Gauge> heartbeatElapsedTimeGauges =
         ratisMetricRegistry.getGauges((s, metric) ->
             s.contains("lastHeartbeatElapsedTime"));
@@ -110,7 +108,7 @@ public class TestRaftServerSlownessDetection extends BaseTest {
     Assert.assertTrue(followerHeartBeatElapsedMetricNew > followerHeartBeatElapsedMetric);
 
     // Followers should not get any failed not notification
-    for (RaftServerImpl followerServer : cluster.getFollowers()) {
+    for (RaftServer.Division followerServer : cluster.getFollowers()) {
       Assert.assertNull(SimpleStateMachine4Testing.get(followerServer).getSlownessInfo());
     }
     // the leader should get notification that the follower has failed now
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index fda250c..da05fe0 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -17,14 +17,14 @@
  */
 package org.apache.ratis.grpc;
 
-import static org.apache.ratis.server.impl.RaftServerMetrics.RAFT_CLIENT_READ_REQUEST;
-import static org.apache.ratis.server.impl.RaftServerMetrics.RAFT_CLIENT_STALE_READ_REQUEST;
-import static org.apache.ratis.server.impl.RaftServerMetrics.RAFT_CLIENT_WATCH_REQUEST;
-import static org.apache.ratis.server.impl.RaftServerMetrics.RAFT_CLIENT_WRITE_REQUEST;
-import static org.apache.ratis.server.impl.RaftServerMetrics.REQUEST_QUEUE_LIMIT_HIT_COUNTER;
-import static org.apache.ratis.server.impl.RaftServerMetrics.REQUEST_BYTE_SIZE;
-import static org.apache.ratis.server.impl.RaftServerMetrics.REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER;
-import static org.apache.ratis.server.impl.RaftServerMetrics.RESOURCE_LIMIT_HIT_COUNTER;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.RAFT_CLIENT_READ_REQUEST;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.RAFT_CLIENT_STALE_READ_REQUEST;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.RAFT_CLIENT_WATCH_REQUEST;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.RAFT_CLIENT_WRITE_REQUEST;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.REQUEST_QUEUE_LIMIT_HIT_COUNTER;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.REQUEST_BYTE_SIZE;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.RESOURCE_LIMIT_HIT_COUNTER;
 
 import com.codahale.metrics.Gauge;
 import org.apache.commons.lang3.RandomStringUtils;
@@ -51,7 +51,7 @@ import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerMetrics;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.impl.ServerImplUtils;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
@@ -107,7 +107,7 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
     RaftServerConfigKeys.setStorageDir(p, Collections.singletonList(cluster.getStorageDir(leaderId)));
     ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), gid -> stateMachine, p, null);
     // Close the server rpc for leader so that new raft server can be bound to it.
-    cluster.getLeader().getServerRpc().close();
+    RaftServerTestUtil.getServerRpc(cluster.getLeader()).close();
 
     // Create a raft server proxy with server rpc bound to same address as
     // the leader. This step would fail as the raft storage has been locked by
@@ -229,9 +229,8 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
       client.async().send(new SimpleMessage(message));
 
 
-      final SortedMap<String, Gauge> gaugeMap =
-              cluster.getLeader().getRaftServerMetrics().getRegistry()
-                      .getGauges((s, metric) -> s.contains(REQUEST_BYTE_SIZE));
+      final SortedMap<String, Gauge> gaugeMap = RaftServerTestUtil.getRaftServerMetrics(cluster.getLeader())
+          .getRegistry().getGauges((s, metric) -> s.contains(REQUEST_BYTE_SIZE));
 
       RaftTestUtil.waitFor(() -> (int) gaugeMap.get(gaugeMap.firstKey()).getValue() == message.length(),
               300, 5000);
@@ -243,8 +242,8 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
       }
 
       // Because we have passed 11 requests, and the element queue size is 10.
-      RaftTestUtil.waitFor(() -> cluster.getLeader().getRaftServerMetrics().getCounter(REQUEST_QUEUE_LIMIT_HIT_COUNTER)
-              .getCount() == 1, 300, 5000);
+      RaftTestUtil.waitFor(() -> RaftServerTestUtil.getRaftServerMetrics(cluster.getLeader())
+          .getCounter(REQUEST_QUEUE_LIMIT_HIT_COUNTER).getCount() == 1, 300, 5000);
 
       stateMachine.unblockFlushStateMachineData();
 
@@ -255,10 +254,10 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
       client.async().send(new SimpleMessage(RandomStringUtils.random(120, true, false)));
       clients.add(client);
 
-      RaftTestUtil.waitFor(() -> cluster.getLeader().getRaftServerMetrics()
+      RaftTestUtil.waitFor(() -> RaftServerTestUtil.getRaftServerMetrics(cluster.getLeader())
               .getCounter(REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER).getCount() == 1, 300, 5000);
 
-      Assert.assertEquals(2, cluster.getLeader().getRaftServerMetrics()
+      Assert.assertEquals(2, RaftServerTestUtil.getRaftServerMetrics(cluster.getLeader())
               .getCounter(RESOURCE_LIMIT_HIT_COUNTER).getCount());
     } finally {
       for (RaftClient client : clients) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java
index c5ae2ee..c128d28 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -23,7 +23,7 @@ import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.metrics.MetricRegistries;
 import org.apache.ratis.metrics.MetricRegistryInfo;
 import org.apache.ratis.metrics.RatisMetricRegistry;
-import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.statemachine.RaftSnapshotBaseTest;
 import org.junit.Assert;
 
@@ -36,7 +36,7 @@ public class TestRaftSnapshotWithGrpc extends RaftSnapshotBaseTest {
   }
 
   @Override
-  protected void verifyInstallSnapshotMetric(RaftServerImpl leader) {
+  protected void verifyInstallSnapshotMetric(RaftServer.Division leader) {
     MetricRegistryInfo info = new MetricRegistryInfo(leader.getMemberId().toString(),
         "ratis_grpc", "log_appender", "Metrics for Ratis Grpc Log Appender");
     Optional<RatisMetricRegistry> metricRegistry = MetricRegistries.global().get(info);