You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/02 01:21:49 UTC

[incubator-ratis] branch master updated: RATIS-1191. Define an interface for FollowerInfo. (#311)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a3589f  RATIS-1191. Define an interface for FollowerInfo. (#311)
9a3589f is described below

commit 9a3589f9bbe74bfd612e33abe9b0313f719d776f
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Dec 2 09:21:03 2020 +0800

    RATIS-1191. Define an interface for FollowerInfo. (#311)
---
 .../java/org/apache/ratis/grpc/GrpcFactory.java    |  3 +-
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  6 +-
 .../org/apache/ratis/hadooprpc/HadoopFactory.java  |  4 +-
 .../java/org/apache/ratis/netty/NettyFactory.java  |  4 +-
 .../{FollowerInfo.java => FollowerInfoImpl.java}   | 48 +++++++------
 .../org/apache/ratis/server/impl/LeaderState.java  | 47 +++++++++----
 .../org/apache/ratis/server/impl/LogAppender.java  | 26 +++----
 .../apache/ratis/server/impl/RaftServerImpl.java   |  9 +--
 .../apache/ratis/server/impl/ServerFactory.java    | 15 ++--
 .../apache/ratis/server/leader/FollowerInfo.java   | 81 ++++++++++++++++++++++
 .../ratis/server/metrics/LogAppenderMetrics.java   | 19 ++---
 .../ratis/server/impl/TestLogAppenderMetrics.java  | 49 +++++++++----
 .../ratis/server/simulation/SimulatedRpc.java      |  4 +-
 .../apache/ratis/grpc/TestLogAppenderWithGrpc.java |  2 +-
 14 files changed, 215 insertions(+), 102 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
index 33c9a0b..e6efebe 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -25,6 +25,7 @@ import org.apache.ratis.grpc.server.GrpcLogAppender;
 import org.apache.ratis.grpc.server.GrpcService;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.*;
 import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index c1d4515..7bd4f5c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -23,7 +23,7 @@ import org.apache.ratis.grpc.GrpcUtil;
 import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.FollowerInfo;
+import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.impl.LeaderState;
 import org.apache.ratis.server.impl.LogAppender;
 import org.apache.ratis.server.impl.RaftServerImpl;
@@ -247,7 +247,7 @@ public class GrpcLogAppender extends LogAppender {
   }
 
   private void increaseNextIndex(final long installedSnapshotIndex) {
-    getFollower().updateNextIndexToMax(installedSnapshotIndex + 1);
+    getFollower().updateNextIndex(installedSnapshotIndex + 1);
   }
 
   /**
@@ -346,7 +346,7 @@ public class GrpcLogAppender extends LogAppender {
 
   private synchronized void updateNextIndex(long replyNextIndex) {
     pendingRequests.clear();
-    getFollower().updateNextIndex(replyNextIndex);
+    getFollower().setNextIndex(replyNextIndex);
   }
 
   private class InstallSnapshotResponseHandler implements StreamObserver<InstallSnapshotReplyProto> {
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
index cf4c5fd..f024b81 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,7 +28,7 @@ import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.ServerFactory;
 
-public class HadoopFactory extends ServerFactory.BaseFactory implements ClientFactory {
+public class HadoopFactory implements ServerFactory, ClientFactory {
   public static Parameters newRaftParameters(Configuration conf) {
     final Parameters p = new Parameters();
     HadoopConfigKeys.setConf(p, conf);
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
index 450b95f..7f981ae 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -27,7 +27,7 @@ import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.ServerFactory;
 
-public class NettyFactory extends ServerFactory.BaseFactory implements ClientFactory {
+public class NettyFactory implements ServerFactory, ClientFactory {
   public NettyFactory(Parameters parameters) {}
 
   @Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
similarity index 83%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
rename to ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index c423e68..0f6c1ab 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -19,18 +19,15 @@ package org.apache.ratis.server.impl;
 
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogIndex;
 import org.apache.ratis.util.Timestamp;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
-public class FollowerInfo {
-  public static final Logger LOG = LoggerFactory.getLogger(FollowerInfo.class);
-
+class FollowerInfoImpl implements FollowerInfo {
   private final String name;
   private final Consumer<Object> infoIndexChange;
   private final Consumer<Object> debugIndexChange;
@@ -43,10 +40,8 @@ public class FollowerInfo {
   private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX);
   private final RaftLogIndex snapshotIndex = new RaftLogIndex("snapshotIndex", 0L);
   private volatile boolean attendVote;
-  private final int rpcSlownessTimeoutMs;
 
-  FollowerInfo(RaftGroupMemberId id, RaftPeer peer, Timestamp lastRpcTime, long nextIndex,
-      boolean attendVote, int rpcSlownessTimeoutMs) {
+  FollowerInfoImpl(RaftGroupMemberId id, RaftPeer peer, Timestamp lastRpcTime, long nextIndex, boolean attendVote) {
     this.name = id + "->" + peer.getId();
     this.infoIndexChange = s -> LOG.info("{}: {}", name, s);
     this.debugIndexChange = s -> LOG.debug("{}: {}", name, s);
@@ -56,56 +51,66 @@ public class FollowerInfo {
     this.lastRpcSendTime = new AtomicReference<>(lastRpcTime);
     this.nextIndex = new RaftLogIndex("nextIndex", nextIndex);
     this.attendVote = attendVote;
-    this.rpcSlownessTimeoutMs = rpcSlownessTimeoutMs;
   }
 
+  @Override
   public long getMatchIndex() {
     return matchIndex.get();
   }
 
+  @Override
   public boolean updateMatchIndex(long newMatchIndex) {
     return matchIndex.updateToMax(newMatchIndex, debugIndexChange);
   }
 
-  /** @return the commit index acked by the follower. */
-  long getCommitIndex() {
+  @Override
+  public long getCommitIndex() {
     return commitIndex.get();
   }
 
-  boolean updateCommitIndex(long newCommitIndex) {
+  @Override
+  public boolean updateCommitIndex(long newCommitIndex) {
     return commitIndex.updateToMax(newCommitIndex, debugIndexChange);
   }
 
-  long getSnapshotIndex() {
+  @Override
+  public long getSnapshotIndex() {
     return snapshotIndex.get();
   }
 
+  @Override
   public long getNextIndex() {
     return nextIndex.get();
   }
 
+  @Override
   public void increaseNextIndex(long newNextIndex) {
     nextIndex.updateIncreasingly(newNextIndex, debugIndexChange);
   }
 
+  @Override
   public void decreaseNextIndex(long newNextIndex) {
     nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1, newNextIndex), infoIndexChange);
   }
 
-  public void updateNextIndex(long newNextIndex) {
+  @Override
+  public void setNextIndex(long newNextIndex) {
     nextIndex.updateUnconditionally(old -> newNextIndex >= 0 ? newNextIndex : old, infoIndexChange);
   }
 
-  public void updateNextIndexToMax(long newNextIndex) {
+  @Override
+  public void updateNextIndex(long newNextIndex) {
     nextIndex.updateToMax(newNextIndex, infoIndexChange);
   }
 
+  @Override
   public void setSnapshotIndex(long newSnapshotIndex) {
     snapshotIndex.setUnconditionally(newSnapshotIndex, infoIndexChange);
     matchIndex.setUnconditionally(newSnapshotIndex, infoIndexChange);
     nextIndex.setUnconditionally(newSnapshotIndex + 1, infoIndexChange);
   }
 
+  @Override
   public String getName() {
     return name;
   }
@@ -126,29 +131,28 @@ public class FollowerInfo {
     return attendVote;
   }
 
+  @Override
   public RaftPeer getPeer() {
     return peer;
   }
 
-  /** Update lastRpcResponseTime to the current time. */
+  @Override
   public void updateLastRpcResponseTime() {
     lastRpcResponseTime.set(Timestamp.currentTime());
   }
 
-  Timestamp getLastRpcResponseTime() {
+  @Override
+  public Timestamp getLastRpcResponseTime() {
     return lastRpcResponseTime.get();
   }
 
-  /** Update lastRpcSendTime to the current time. */
+  @Override
   public void updateLastRpcSendTime() {
     lastRpcSendTime.set(Timestamp.currentTime());
   }
 
+  @Override
   public Timestamp getLastRpcTime() {
     return Timestamp.latest(lastRpcResponseTime.get(), lastRpcSendTime.get());
   }
-
-  boolean isSlow() {
-    return lastRpcResponseTime.get().elapsedTimeMs() > rpcSlownessTimeoutMs;
-  }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 30eec01..7002c44 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -24,6 +24,7 @@ import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.NotReplicatedException;
 import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
+import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.metrics.LogAppenderMetrics;
 import org.apache.ratis.server.metrics.RaftServerMetrics;
@@ -316,6 +317,14 @@ public class LeaderState {
     return currentTerm;
   }
 
+  boolean handleResponseTerm(FollowerInfo follower, long followerTerm) {
+    if (isAttendingVote(follower) && followerTerm > getCurrentTerm()) {
+      submitStepDownEvent(followerTerm, LeaderState.StepDownReason.HIGHER_TERM);
+      return true;
+    }
+    return false;
+  }
+
   TimeDuration getSyncInterval() {
     return syncInterval;
   }
@@ -432,9 +441,10 @@ public class LeaderState {
     }
   }
 
-  AppendEntriesRequestProto newAppendEntriesRequestProto(RaftPeerId targetId,
-      TermIndex previous, List<LogEntryProto> entries, boolean initializing,
-      long callId) {
+  AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,
+      List<LogEntryProto> entries, TermIndex previous, long callId) {
+    final boolean initializing = isAttendingVote(follower);
+    final RaftPeerId targetId = follower.getPeer().getId();
     return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, currentTerm, entries,
         ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()),
         initializing, previous, server.getCommitInfos(), callId);
@@ -451,12 +461,11 @@ public class LeaderState {
     final Timestamp t = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
     final List<LogAppender> newAppenders = newPeers.stream()
         .map(peer -> {
-          final FollowerInfo f = new FollowerInfo(server.getMemberId(), peer, t, nextIndex, attendVote,
-              server.getRpcSlownessTimeoutMs());
+          final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, t, nextIndex, attendVote);
           LogAppender logAppender = server.newLogAppender(this, f);
           peerIdFollowerInfoMap.put(peer.getId(), f);
           raftServerMetrics.addFollower(peer.getId());
-          logAppenderMetrics.addFollowerGauges(f);
+          logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcTime);
           return logAppender;
         }).collect(Collectors.toList());
     senders.addAll(newAppenders);
@@ -570,9 +579,8 @@ public class LeaderState {
    *    caught-up.
    * 3. Otherwise the peer is making progressing. Keep waiting.
    */
-  private BootStrapProgress checkProgress(FollowerInfo follower,
-      long committed) {
-    Preconditions.assertTrue(!follower.isAttendingVote());
+  private BootStrapProgress checkProgress(FollowerInfo follower, long committed) {
+    Preconditions.assertTrue(!isAttendingVote(follower));
     final Timestamp progressTime = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
     final Timestamp timeoutTime = Timestamp.currentTime().addTimeMs(-3L * server.getMaxTimeoutMs());
     if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) {
@@ -589,13 +597,17 @@ public class LeaderState {
   private Collection<BootStrapProgress> checkAllProgress(long committed) {
     Preconditions.assertTrue(inStagingState());
     return senders.stream()
-        .filter(sender -> !sender.getFollower().isAttendingVote())
+        .filter(sender -> !isAttendingVote(sender.getFollower()))
         .map(sender -> checkProgress(sender.getFollower(), committed))
         .collect(Collectors.toCollection(ArrayList::new));
   }
 
-  void submitCheckStagingEvent() {
-    eventQueue.submit(checkStagingEvent);
+  void submitEventOnSuccessAppend(FollowerInfo follower) {
+    if (isAttendingVote(follower)) {
+      submitUpdateCommitEvent();
+    } else {
+      eventQueue.submit(checkStagingEvent);
+    }
   }
 
   private void checkStaging() {
@@ -611,7 +623,10 @@ public class LeaderState {
       } else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
         // all caught up!
         applyOldNewConf();
-        senders.forEach(s -> s.getFollower().startAttendVote());
+        senders.stream()
+            .map(LogAppender::getFollower)
+            .map(FollowerInfoImpl.class::cast)
+            .forEach(FollowerInfoImpl::startAttendVote);
       }
     }
   }
@@ -961,7 +976,7 @@ public class LeaderState {
     void fail(BootStrapProgress progress) {
       final String message = this + ": Fail to set configuration " + newConf + " due to " + progress;
       LOG.debug(message);
-      stopAndRemoveSenders(s -> !s.getFollower().isAttendingVote());
+      stopAndRemoveSenders(s -> !isAttendingVote(s.getFollower()));
 
       LeaderState.this.stagingState = null;
       // send back failure response to client's request
@@ -987,6 +1002,10 @@ public class LeaderState {
     return senders.stream();
   }
 
+  private static boolean isAttendingVote(FollowerInfo follower) {
+    return ((FollowerInfoImpl)follower).isAttendingVote();
+  }
+
   /**
    * Record Follower Heartbeat Elapsed Time.
    */
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index aac6c05..a784409 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -19,6 +19,7 @@ package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.FileInfo;
@@ -231,8 +232,7 @@ public class LogAppender {
     final long heartbeatRemainingMs = getHeartbeatRemainingTime();
     if (heartbeatRemainingMs <= 0L || heartbeat) {
       // heartbeat
-      return leaderState.newAppendEntriesRequestProto(
-          getFollowerId(), previous, Collections.emptyList(), !follower.isAttendingVote(), callId);
+      return leaderState.newAppendEntriesRequestProto(follower, Collections.emptyList(), previous, callId);
     }
 
     Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + buffer.getNumElements() + " elements.");
@@ -254,8 +254,7 @@ public class LogAppender {
             follower.getName(), entry, time, exception));
     buffer.clear();
     assertProtos(protos, followerNext, previous, snapshotIndex);
-    return leaderState.newAppendEntriesRequestProto(
-        getFollowerId(), previous, protos, !follower.isAttendingVote(), callId);
+    return leaderState.newAppendEntriesRequestProto(follower, protos, previous, callId);
   }
 
   private void assertProtos(List<LogEntryProto> protos, long nextIndex, TermIndex previous, long snapshotIndex) {
@@ -554,19 +553,15 @@ public class LogAppender {
   }
 
   protected void submitEventOnSuccessAppend() {
-    if (follower.isAttendingVote()) {
-      leaderState.submitUpdateCommitEvent();
-    } else {
-      leaderState.submitCheckStagingEvent();
-    }
+    leaderState.submitEventOnSuccessAppend(follower);
   }
 
   protected void checkSlowness() {
-    if (follower.isSlow()) {
+    final TimeDuration lastRpcResponseElapsed = follower.getLastRpcResponseTime().elapsedTime();
+    if (lastRpcResponseElapsed.compareTo(server.getRpcSlownessTimeout()) > 0) {
       server.getStateMachine().leaderEvent().notifyFollowerSlowness(server.getRoleInfoProto());
     }
-    leaderState.recordFollowerHeartbeatElapsedTime(follower.getPeer().getId(),
-        follower.getLastRpcResponseTime().elapsedTime());
+    leaderState.recordFollowerHeartbeatElapsedTime(follower.getPeer().getId(), lastRpcResponseElapsed);
   }
 
   public synchronized void notifyAppend() {
@@ -603,12 +598,7 @@ public class LogAppender {
 
   protected boolean checkResponseTerm(long responseTerm) {
     synchronized (server) {
-      if (isAppenderRunning() && follower.isAttendingVote()
-          && responseTerm > leaderState.getCurrentTerm()) {
-        leaderState.submitStepDownEvent(responseTerm, LeaderState.StepDownReason.HIGHER_TERM);
-        return true;
-      }
+      return isAppenderRunning() && leaderState.handleResponseTerm(follower, responseTerm);
     }
-    return false;
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 02ad561..6971e7c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -33,6 +33,7 @@ import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
 import org.apache.ratis.protocol.exceptions.StaleReadException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.server.DataStreamMap;
+import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerMXBean;
@@ -93,7 +94,7 @@ public class RaftServerImpl implements RaftServer.Division,
   private final int minTimeoutMs;
   private final int maxTimeoutMs;
   private final TimeDuration leaderStepDownWaitTime;
-  private final int rpcSlownessTimeoutMs;
+  private final TimeDuration rpcSlownessTimeout;
   private final TimeDuration sleepDeviationThreshold;
   private final boolean installSnapshotEnabled;
 
@@ -131,7 +132,7 @@ public class RaftServerImpl implements RaftServer.Division,
     final RaftProperties properties = proxy.getProperties();
     minTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties).toIntExact(TimeUnit.MILLISECONDS);
     maxTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMax(properties).toIntExact(TimeUnit.MILLISECONDS);
-    rpcSlownessTimeoutMs = RaftServerConfigKeys.Rpc.slownessTimeout(properties).toIntExact(TimeUnit.MILLISECONDS);
+    this.rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties);
     leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
     this.sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties);
     installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
@@ -178,8 +179,8 @@ public class RaftServerImpl implements RaftServer.Division,
     return maxTimeoutMs;
   }
 
-  int getRpcSlownessTimeoutMs() {
-    return rpcSlownessTimeoutMs;
+  TimeDuration getRpcSlownessTimeout() {
+    return rpcSlownessTimeout;
   }
 
   TimeDuration getRandomElectionTimeout() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
index e0149f2..7300ddc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,6 +18,7 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.ratis.rpc.RpcFactory;
+import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerRpc;
 
@@ -33,15 +34,9 @@ public interface ServerFactory extends RpcFactory {
   }
 
   /** Create a new {@link LogAppender}. */
-  LogAppender newLogAppender(RaftServerImpl server, LeaderState state, FollowerInfo f);
+  default LogAppender newLogAppender(RaftServerImpl server, LeaderState state, FollowerInfo f) {
+    return new LogAppender(server, state, f);
+  }
 
   RaftServerRpc newRaftServerRpc(RaftServer server);
-
-  abstract class BaseFactory implements ServerFactory {
-    @Override
-    public LogAppender newLogAppender(
-        RaftServerImpl server, LeaderState state, FollowerInfo f) {
-      return new LogAppender(server, state, f);
-    }
-  }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
new file mode 100644
index 0000000..487576f
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.leader;
+
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.Timestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Information of a follower, provided the local server is the Leader
+ */
+public interface FollowerInfo {
+  Logger LOG = LoggerFactory.getLogger(FollowerInfo.class);
+
+  /** @return the name of this object. */
+  String getName();
+
+  /** @return this follower's peer info. */
+  RaftPeer getPeer();
+
+  /** @return the matchIndex acknowledged by this follower. */
+  long getMatchIndex();
+
+  /** Update this follower's matchIndex. */
+  boolean updateMatchIndex(long newMatchIndex);
+
+  /** @return the commitIndex acknowledged by this follower. */
+  long getCommitIndex();
+
+  /** Update follower's commitIndex. */
+  boolean updateCommitIndex(long newCommitIndex);
+
+  /** @return the snapshotIndex acknowledged by this follower. */
+  long getSnapshotIndex();
+
+  /** Set follower's snapshotIndex. */
+  void setSnapshotIndex(long newSnapshotIndex);
+
+  /** @return the nextIndex for this follower. */
+  long getNextIndex();
+
+  /** Increase the nextIndex for this follower. */
+  void increaseNextIndex(long newNextIndex);
+
+  /** Decrease the nextIndex for this follower. */
+  void decreaseNextIndex(long newNextIndex);
+
+  /** Set the nextIndex for this follower. */
+  void setNextIndex(long newNextIndex);
+
+  /** Update the nextIndex for this follower. */
+  void updateNextIndex(long newNextIndex);
+
+  /** @return the lastRpcResponseTime . */
+  Timestamp getLastRpcResponseTime();
+
+  /** Update lastRpcResponseTime to the current time. */
+  void updateLastRpcResponseTime();
+
+  /** Update lastRpcSendTime to the current time. */
+  void updateLastRpcSendTime();
+
+  /** @return the latest of the lastRpcSendTime and the lastRpcResponseTime . */
+  Timestamp getLastRpcTime();
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java
index e4213c0..ea6c940 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java
@@ -25,7 +25,11 @@ import org.apache.ratis.metrics.MetricRegistryInfo;
 import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.metrics.RatisMetrics;
 import org.apache.ratis.protocol.RaftGroupMemberId;
-import org.apache.ratis.server.impl.FollowerInfo;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.Timestamp;
+
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
 
 public final class LogAppenderMetrics extends RatisMetrics {
   public static final String RATIS_LOG_APPENDER_METRICS = "log_appender";
@@ -41,13 +45,10 @@ public final class LogAppenderMetrics extends RatisMetrics {
         RATIS_LOG_APPENDER_METRICS, RATIS_LOG_APPENDER_METRICS_DESC));
   }
 
-  public void addFollowerGauges(FollowerInfo followerInfo) {
-    registry.gauge(String.format(FOLLOWER_NEXT_INDEX,
-        followerInfo.getPeer().getId().toString()),
-        () -> followerInfo::getNextIndex);
-    registry.gauge(String.format(FOLLOWER_MATCH_INDEX, followerInfo.getPeer().getId().toString()),
-        () -> followerInfo::getMatchIndex);
-    registry.gauge(String.format(FOLLOWER_RPC_RESP_TIME, followerInfo.getPeer().getId().toString()),
-        () -> () -> followerInfo.getLastRpcTime().elapsedTimeMs());
+  public void addFollowerGauges(RaftPeerId id, LongSupplier getNextIndex, LongSupplier getMatchIndex,
+      Supplier<Timestamp> getLastRpcTime) {
+    registry.gauge(String.format(FOLLOWER_NEXT_INDEX, id), () -> getNextIndex::getAsLong);
+    registry.gauge(String.format(FOLLOWER_MATCH_INDEX, id), () -> getMatchIndex::getAsLong);
+    registry.gauge(String.format(FOLLOWER_RPC_RESP_TIME, id), () -> () -> getLastRpcTime.get().elapsedTimeMs());
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestLogAppenderMetrics.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestLogAppenderMetrics.java
index 74148f9..f2e3383 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestLogAppenderMetrics.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestLogAppenderMetrics.java
@@ -20,13 +20,10 @@ package org.apache.ratis.server.impl;
 import static org.apache.ratis.server.metrics.RaftLogMetrics.FOLLOWER_MATCH_INDEX;
 import static org.apache.ratis.server.metrics.RaftLogMetrics.FOLLOWER_NEXT_INDEX;
 import static org.apache.ratis.server.metrics.RaftLogMetrics.FOLLOWER_RPC_RESP_TIME;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftGroupMemberId;
-import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.metrics.LogAppenderMetrics;
 import org.apache.ratis.util.Timestamp;
@@ -40,20 +37,18 @@ public class TestLogAppenderMetrics {
 
   private RatisMetricRegistry ratisMetricRegistry;
   private RaftPeerId raftPeerId;
-  private FollowerInfo followerInfo;
+  private MyFollowerInfo followerInfo;
 
   @Before
   public void setup() {
     RaftGroupId raftGroupId = RaftGroupId.randomId();
     raftPeerId = RaftPeerId.valueOf("TestId");
-    final RaftPeer raftPeer = RaftPeer.newBuilder().setId(raftPeerId).build();
     RaftGroupMemberId raftGroupMemberId = RaftGroupMemberId.valueOf(raftPeerId, raftGroupId);
-    LogAppender logAppender = mock(LogAppender.class);
-    followerInfo = new TestFollowerInfo(raftGroupMemberId, raftPeer, Timestamp.currentTime(), 100L, true, 1000);
-    when(logAppender.getFollower()).thenReturn(followerInfo);
+    followerInfo = new MyFollowerInfo(100L);
     LogAppenderMetrics logAppenderMetrics = new LogAppenderMetrics(raftGroupMemberId);
     ratisMetricRegistry = logAppenderMetrics.getRegistry();
-    logAppenderMetrics.addFollowerGauges(followerInfo);
+    logAppenderMetrics.addFollowerGauges(raftPeerId, followerInfo::getNextIndex, followerInfo::getMatchIndex,
+        followerInfo::getLastRpcTime);
   }
 
   @Test
@@ -75,11 +70,37 @@ public class TestLogAppenderMetrics {
     Assert.assertNotNull(rpcTime.getValue());
   }
 
-  private static class TestFollowerInfo extends FollowerInfo {
-    TestFollowerInfo(RaftGroupMemberId id, RaftPeer peer, Timestamp
-        lastRpcTime, long nextIndex, boolean attendVote, int
-        rpcSlownessTimeoutMs) {
-      super(id, peer, lastRpcTime, nextIndex, attendVote, rpcSlownessTimeoutMs);
+  private static class MyFollowerInfo {
+    private volatile long nextIndex;
+    private volatile long matchIndex;
+    private volatile Timestamp lastRpcTime = Timestamp.currentTime();
+
+    MyFollowerInfo(long nextIndex) {
+      this.nextIndex = nextIndex;
+    }
+
+    long getNextIndex() {
+      return nextIndex;
+    }
+
+    void updateNextIndex(long nextIndex) {
+      this.nextIndex = nextIndex;
+    }
+
+    long getMatchIndex() {
+      return matchIndex;
+    }
+
+    void updateMatchIndex(long matchIndex) {
+      this.matchIndex = matchIndex;
+    }
+
+    Timestamp getLastRpcTime() {
+      return lastRpcTime;
+    }
+
+    void updateLastRpcResponseTime() {
+      lastRpcTime = Timestamp.currentTime();
     }
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
index 11fe2ee..f5ee907 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -41,7 +41,7 @@ class SimulatedRpc implements RpcType {
     return new Factory(parameters);
   }
 
-  static class Factory extends ServerFactory.BaseFactory implements ClientFactory {
+  static class Factory implements ServerFactory, ClientFactory {
     static String SERVER_REQUEST_REPLY_KEY = "raft.simulated.serverRequestReply";
     static String CLIENT_TO_SERVER_REQUEST_REPLY_KEY = "raft.simulated.client2serverRequestReply";
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
index 38d31c5..067baff 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
@@ -27,7 +27,7 @@ import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.FollowerInfo;
+import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;