You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/04 13:04:19 UTC

[incubator-ratis] branch master updated: RATIS-1205. Change LogAppender to use RaftServer.Division and LeaderState. (#323)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 3569b13  RATIS-1205. Change LogAppender to use RaftServer.Division and LeaderState. (#323)
3569b13 is described below

commit 3569b13d684ec1ac50f3a6d4c7560d5711ef22af
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Dec 4 21:04:13 2020 +0800

    RATIS-1205. Change LogAppender to use RaftServer.Division and LeaderState. (#323)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  9 ++--
 .../apache/ratis/server/DivisionProperties.java    | 55 +++++++++++++++++++
 .../java/org/apache/ratis/server/RaftServer.java   | 14 +++++
 .../ratis/server/impl/DivisionPropertiesImpl.java  | 61 ++++++++++++++++++++++
 .../apache/ratis/server/impl/FollowerState.java    |  2 +-
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 40 ++++++++------
 .../org/apache/ratis/server/impl/LogAppender.java  | 56 ++++++++------------
 .../apache/ratis/server/impl/RaftServerImpl.java   | 30 +++++------
 .../apache/ratis/server/impl/RaftServerProxy.java  |  8 +--
 .../org/apache/ratis/server/impl/ServerState.java  |  2 +-
 .../apache/ratis/server/leader/LeaderState.java    | 26 +++++++++
 .../ratis/server/impl/RaftServerTestUtil.java      |  2 +-
 .../ratis/datastream/DataStreamBaseTest.java       | 15 +++++-
 13 files changed, 237 insertions(+), 83 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 38a2e41..85d3572 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -140,8 +140,7 @@ public class GrpcLogAppender extends LogAppender {
         appendLog(installSnapshotRequired || haveTooManyPendingRequests());
 
       }
-      checkSlowness();
-
+      getLeaderState().checkHealth(getFollower());
     }
 
     Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted);
@@ -299,9 +298,9 @@ public class GrpcLogAppender extends LogAppender {
       switch (reply.getResult()) {
         case SUCCESS:
           grpcServerMetrics.onRequestSuccess(getFollowerId().toString(), reply.getIsHearbeat());
-          updateCommitIndex(reply.getFollowerCommit());
+          getLeaderState().onFollowerCommitIndex(getFollower(), reply.getFollowerCommit());
           if (getFollower().updateMatchIndex(reply.getMatchIndex())) {
-            submitEventOnSuccessAppend();
+            getLeaderState().onFollowerSuccessAppendEntries(getFollower());
           }
           break;
         case NOT_LEADER:
@@ -417,7 +416,7 @@ public class GrpcLogAppender extends LogAppender {
           final long followerSnapshotIndex = reply.getSnapshotIndex();
           LOG.info("{}: Already Installed Snapshot Index {}.", this, followerSnapshotIndex);
           getFollower().setSnapshotIndex(followerSnapshotIndex);
-          updateCommitIndex(followerSnapshotIndex);
+          getLeaderState().onFollowerCommitIndex(getFollower(), followerSnapshotIndex);
           increaseNextIndex(followerSnapshotIndex);
           removePending(reply);
           break;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/DivisionProperties.java b/ratis-server/src/main/java/org/apache/ratis/server/DivisionProperties.java
new file mode 100644
index 0000000..a7b8c9d
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/DivisionProperties.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server;
+
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The properties set for a server division.
+ *
+ * @see RaftServerConfigKeys
+ */
+public interface DivisionProperties {
+  Logger LOG = LoggerFactory.getLogger(DivisionProperties.class);
+
+  /** @return the minimum rpc timeout. */
+  TimeDuration minRpcTimeout();
+
+  /** @return the minimum rpc timeout in milliseconds. */
+  default int minRpcTimeoutMs() {
+    return minRpcTimeout().toIntExact(TimeUnit.MILLISECONDS);
+  }
+
+  /** @return the maximum rpc timeout. */
+  TimeDuration maxRpcTimeout();
+
+  /** @return the maximum rpc timeout in milliseconds. */
+  default int maxRpcTimeoutMs() {
+    return maxRpcTimeout().toIntExact(TimeUnit.MILLISECONDS);
+  }
+
+  /** @return the rpc sleep time period. */
+  TimeDuration rpcSleepTime();
+
+  /** @return the rpc slowness timeout. */
+  TimeDuration rpcSlownessTimeout();
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index d6e267b..b76705f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -25,6 +25,7 @@ import org.apache.ratis.rpc.RpcFactory;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.ServerImplUtils;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
 import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
 import org.apache.ratis.server.raftlog.RaftLog;
@@ -50,6 +51,9 @@ public interface RaftServer extends Closeable, RpcType.Get,
   interface Division extends Closeable {
     Logger LOG = LoggerFactory.getLogger(Division.class);
 
+    /** @return the {@link DivisionProperties} for this division. */
+    DivisionProperties properties();
+
     /** @return the {@link RaftGroupMemberId} for this division. */
     RaftGroupMemberId getMemberId();
 
@@ -79,6 +83,9 @@ public interface RaftServer extends Closeable, RpcType.Get,
     /** @return the {@link RaftServer} containing this division. */
     RaftServer getRaftServer();
 
+    /** @return the {@link RaftServerMetrics} for this division. */
+    RaftServerMetrics getRaftServerMetrics();
+
     /** @return the {@link StateMachine} for this division. */
     StateMachine getStateMachine();
 
@@ -115,6 +122,13 @@ public interface RaftServer extends Closeable, RpcType.Get,
   /** @return the server properties. */
   RaftProperties getProperties();
 
+  /** @return the rpc service. */
+  RaftServerRpc getServerRpc();
+
+  default RpcType getRpcType() {
+    return getFactory().getRpcType();
+  }
+
   /** @return the factory for creating server components. */
   RpcFactory getFactory();
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java
new file mode 100644
index 0000000..63cbc02
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.DivisionProperties;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.TimeDuration;
+
+class DivisionPropertiesImpl implements DivisionProperties {
+  private final TimeDuration rpcTimeoutMin;
+  private final TimeDuration rpcTimeoutMax;
+  private final TimeDuration rpcSleepTime;
+  private final TimeDuration rpcSlownessTimeout;
+
+  DivisionPropertiesImpl(RaftProperties properties) {
+    this.rpcTimeoutMin = RaftServerConfigKeys.Rpc.timeoutMin(properties);
+    this.rpcTimeoutMax = RaftServerConfigKeys.Rpc.timeoutMax(properties);
+    Preconditions.assertTrue(rpcTimeoutMax.compareTo(rpcTimeoutMin) >= 0,
+        "rpcTimeoutMax = %s < rpcTimeoutMin = %s", rpcTimeoutMax, rpcTimeoutMin);
+
+    this.rpcSleepTime = RaftServerConfigKeys.Rpc.sleepTime(properties);
+    this.rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties);
+  }
+
+  @Override
+  public TimeDuration minRpcTimeout() {
+    return rpcTimeoutMin;
+  }
+
+  @Override
+  public TimeDuration maxRpcTimeout() {
+    return rpcTimeoutMax;
+  }
+
+  @Override
+  public TimeDuration rpcSleepTime() {
+    return rpcSleepTime;
+  }
+
+  @Override
+  public TimeDuration rpcSlownessTimeout() {
+    return rpcSlownessTimeout;
+  }
+}
\ No newline at end of file
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index 14bff6d..c54291a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -87,7 +87,7 @@ class FollowerState extends Daemon {
   }
 
   boolean shouldWithholdVotes() {
-    return lastRpcTime.elapsedTimeMs() < server.getMinTimeoutMs();
+    return lastRpcTime.elapsedTime().compareTo(server.properties().minRpcTimeout()) < 0;
   }
 
   void stopRunning() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 1543dc3..3d52e44 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -242,7 +242,6 @@ class LeaderStateImpl implements LeaderState {
   private volatile boolean running = true;
 
   private final int stagingCatchupGap;
-  private final TimeDuration syncInterval;
   private final long placeHolderIndex;
   private final RaftServerMetrics raftServerMetrics;
   private final LogAppenderMetrics logAppenderMetrics;
@@ -253,7 +252,6 @@ class LeaderStateImpl implements LeaderState {
 
     final RaftProperties properties = server.getRaftServer().getProperties();
     stagingCatchupGap = RaftServerConfigKeys.stagingCatchupGap(properties);
-    syncInterval = RaftServerConfigKeys.Rpc.sleepTime(properties);
 
     final ServerState state = server.getState();
     this.raftLog = state.getLog();
@@ -326,7 +324,8 @@ class LeaderStateImpl implements LeaderState {
     return currentTerm;
   }
 
-  boolean handleResponseTerm(FollowerInfo follower, long followerTerm) {
+  @Override
+  public boolean onFollowerTerm(FollowerInfo follower, long followerTerm) {
     if (isAttendingVote(follower) && followerTerm > getCurrentTerm()) {
       submitStepDownEvent(followerTerm, StepDownReason.HIGHER_TERM);
       return true;
@@ -334,10 +333,6 @@ class LeaderStateImpl implements LeaderState {
     return false;
   }
 
-  TimeDuration getSyncInterval() {
-    return syncInterval;
-  }
-
   /**
    * Start bootstrapping new peers
    */
@@ -413,7 +408,14 @@ class LeaderStateImpl implements LeaderState {
     }
   }
 
-  void commitIndexChanged() {
+  @Override
+  public void onFollowerCommitIndex(FollowerInfo follower, long commitIndex) {
+    if (follower.updateCommitIndex(commitIndex)) {
+      commitIndexChanged();
+    }
+  }
+
+  private void commitIndexChanged() {
     getMajorityMin(FollowerInfo::getCommitIndex, raftLog::getLastCommittedIndex).ifPresent(m -> {
       // Normally, leader commit index is always ahead of followers.
       // However, after a leader change, the new leader commit index may
@@ -450,7 +452,8 @@ class LeaderStateImpl implements LeaderState {
     }
   }
 
-  AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,
+  @Override
+  public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,
       List<LogEntryProto> entries, TermIndex previous, long callId) {
     final boolean initializing = isAttendingVote(follower);
     final RaftPeerId targetId = follower.getPeer().getId();
@@ -487,7 +490,8 @@ class LeaderStateImpl implements LeaderState {
     senders.removeAll(toStop);
   }
 
-  void restartSender(LogAppender sender) {
+  @Override
+  public void restart(LogAppender sender) {
     final FollowerInfo follower = sender.getFollower();
     LOG.info("{}: Restarting {} for {}", this, JavaUtils.getClassSimpleName(sender.getClass()), follower.getName());
     sender.stopAppender();
@@ -611,7 +615,8 @@ class LeaderStateImpl implements LeaderState {
         .collect(Collectors.toCollection(ArrayList::new));
   }
 
-  void submitEventOnSuccessAppend(FollowerInfo follower) {
+  @Override
+  public void onFollowerSuccessAppendEntries(FollowerInfo follower) {
     if (isAttendingVote(follower)) {
       submitUpdateCommitEvent();
     } else {
@@ -767,7 +772,7 @@ class LeaderStateImpl implements LeaderState {
           LOG.info("{} is not included in the new configuration {}. Will shutdown server...", this, conf);
           try {
             // leave some time for all RPC senders to send out new conf entry
-            Thread.sleep(server.getMinTimeoutMs());
+            server.properties().minRpcTimeout().sleep();
           } catch (InterruptedException ignored) {
             Thread.currentThread().interrupt();
           }
@@ -1015,10 +1020,13 @@ class LeaderStateImpl implements LeaderState {
     return ((FollowerInfoImpl)follower).isAttendingVote();
   }
 
-  /**
-   * Record Follower Heartbeat Elapsed Time.
-   */
-  void recordFollowerHeartbeatElapsedTime(RaftPeerId followerId, TimeDuration elapsedTime) {
+  @Override
+  public void checkHealth(FollowerInfo follower) {
+    final TimeDuration elapsedTime = follower.getLastRpcResponseTime().elapsedTime();
+    if (elapsedTime.compareTo(server.properties().rpcSlownessTimeout()) > 0) {
+      server.getStateMachine().leaderEvent().notifyFollowerSlowness(server.getInfo().getRoleInfoProto());
+    }
+    final RaftPeerId followerId = follower.getPeer().getId();
     raftServerMetrics.recordFollowerHeartbeatElapsedTime(followerId, elapsedTime.toLong(TimeUnit.NANOSECONDS));
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index d9aeab4..493b820 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -96,7 +96,7 @@ public class LogAppender {
             lifeCycle.transitionIfNotEqual(EXCEPTION);
           }
           if (lifeCycle.getCurrentState() == EXCEPTION) {
-            leaderState.restartSender(LogAppender.this);
+            leaderState.restart(LogAppender.this);
           }
         }
       }
@@ -136,8 +136,8 @@ public class LogAppender {
   }
 
   private final String name;
-  private final RaftServerImpl server;
-  private final LeaderStateImpl leaderState;
+  private final RaftServer.Division server;
+  private final LeaderState leaderState;
   private final RaftLog raftLog;
   private final FollowerInfo follower;
 
@@ -150,13 +150,13 @@ public class LogAppender {
   public LogAppender(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
     this.follower = f;
     this.name = follower.getName() + "-" + JavaUtils.getClassSimpleName(getClass());
-    this.server = (RaftServerImpl) server;
-    this.leaderState = (LeaderStateImpl) leaderState;
-    this.raftLog = this.server.getState().getLog();
+    this.server = server;
+    this.leaderState = leaderState;
+    this.raftLog = server.getRaftLog();
 
     final RaftProperties properties = server.getRaftServer().getProperties();
     this.snapshotChunkMaxSize = RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt();
-    this.halfMinTimeoutMs = this.server.getMinTimeoutMs() / 2;
+    this.halfMinTimeoutMs = server.properties().minRpcTimeoutMs() / 2;
 
     final SizeInBytes bufferByteLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
     final int bufferElementLimit = RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties);
@@ -201,6 +201,10 @@ public class LogAppender {
     return getFollower().getPeer().getId();
   }
 
+  public LeaderState getLeaderState() {
+    return leaderState;
+  }
+
   private TermIndex getPrevious(long nextIndex) {
     if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) {
       return null;
@@ -212,7 +216,7 @@ public class LogAppender {
       return previous;
     }
 
-    final SnapshotInfo snapshot = server.getState().getLatestSnapshot();
+    final SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
     if (snapshot != null) {
       final TermIndex snapshotTermIndex = snapshot.getTermIndex();
       if (snapshotTermIndex.getIndex() == previousIndex) {
@@ -294,10 +298,10 @@ public class LogAppender {
         }
 
         follower.updateLastRpcSendTime();
-        final AppendEntriesReplyProto r = server.getServerRpc().appendEntries(request);
+        final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
         follower.updateLastRpcResponseTime();
 
-        updateCommitIndex(r.getFollowerCommit());
+        getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit());
         return r;
       } catch (InterruptedIOException | RaftLogIOException e) {
         throw e;
@@ -309,18 +313,12 @@ public class LogAppender {
         handleException(ioe);
       }
       if (isAppenderRunning()) {
-        leaderState.getSyncInterval().sleep();
+        server.properties().rpcSleepTime().sleep();
       }
     }
     return null;
   }
 
-  protected void updateCommitIndex(long commitIndex) {
-    if (follower.updateCommitIndex(commitIndex)) {
-      leaderState.commitIndexChanged();
-    }
-  }
-
   protected InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
     Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() > 0);
     synchronized (server) {
@@ -340,7 +338,7 @@ public class LogAppender {
     try {
       for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
         follower.updateLastRpcSendTime();
-        reply = server.getServerRpc().installSnapshot(request);
+        reply = getServerRpc().installSnapshot(request);
         follower.updateLastRpcResponseTime();
 
         if (!reply.getServerReply().getSuccess()) {
@@ -369,7 +367,7 @@ public class LogAppender {
     // 1. there is no local log entry but there is snapshot
     // 2. or the follower's next index is smaller than the log start index
     if (follower.getNextIndex() < raftLog.getNextIndex()) {
-      SnapshotInfo snapshot = server.getState().getLatestSnapshot();
+      final SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
       if (follower.getNextIndex() < logStartIndex ||
           (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
         return snapshot;
@@ -406,7 +404,7 @@ public class LogAppender {
           }
         }
       }
-      checkSlowness();
+      getLeaderState().checkHealth(getFollower());
     }
   }
 
@@ -425,7 +423,7 @@ public class LogAppender {
           if (nextIndex > oldNextIndex) {
             follower.updateMatchIndex(nextIndex - 1);
             follower.increaseNextIndex(nextIndex);
-            submitEventOnSuccessAppend();
+            getLeaderState().onFollowerSuccessAppendEntries(getFollower());
           }
           break;
         case NOT_LEADER:
@@ -445,19 +443,7 @@ public class LogAppender {
 
   private void handleException(Exception e) {
     LOG.trace("TRACE", e);
-    server.getServerRpc().handleException(follower.getPeer().getId(), e, false);
-  }
-
-  protected void submitEventOnSuccessAppend() {
-    leaderState.submitEventOnSuccessAppend(follower);
-  }
-
-  protected void checkSlowness() {
-    final TimeDuration lastRpcResponseElapsed = follower.getLastRpcResponseTime().elapsedTime();
-    if (lastRpcResponseElapsed.compareTo(server.getRpcSlownessTimeout()) > 0) {
-      server.getStateMachine().leaderEvent().notifyFollowerSlowness(server.getInfo().getRoleInfoProto());
-    }
-    leaderState.recordFollowerHeartbeatElapsedTime(follower.getPeer().getId(), lastRpcResponseElapsed);
+    getServerRpc().handleException(getFollowerId(), e, false);
   }
 
   public synchronized void notifyAppend() {
@@ -494,7 +480,7 @@ public class LogAppender {
 
   protected boolean checkResponseTerm(long responseTerm) {
     synchronized (server) {
-      return isAppenderRunning() && leaderState.handleResponseTerm(follower, responseTerm);
+      return isAppenderRunning() && leaderState.onFollowerTerm(follower, responseTerm);
     }
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 4a9bac6..3bca7a0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -34,6 +34,7 @@ import org.apache.ratis.protocol.exceptions.StaleReadException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.server.DataStreamMap;
 import org.apache.ratis.server.DivisionInfo;
+import org.apache.ratis.server.DivisionProperties;
 import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -134,10 +135,9 @@ class RaftServerImpl implements RaftServer.Division,
   private final StateMachine stateMachine;
   private final Info info =  new Info();
 
-  private final int minTimeoutMs;
+  private final DivisionProperties divisionProperties;
   private final int maxTimeoutMs;
   private final TimeDuration leaderStepDownWaitTime;
-  private final TimeDuration rpcSlownessTimeout;
   private final TimeDuration sleepDeviationThreshold;
   private final boolean installSnapshotEnabled;
 
@@ -173,14 +173,11 @@ class RaftServerImpl implements RaftServer.Division,
     this.role = new RoleInfo(id);
 
     final RaftProperties properties = proxy.getProperties();
-    minTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties).toIntExact(TimeUnit.MILLISECONDS);
-    maxTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMax(properties).toIntExact(TimeUnit.MILLISECONDS);
-    this.rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties);
+    this.divisionProperties = new DivisionPropertiesImpl(properties);
+    maxTimeoutMs = properties().maxRpcTimeoutMs();
     leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
     this.sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties);
     installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
-    Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs,
-        "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
     this.proxy = proxy;
 
     this.state = new ServerState(id, group, properties, this, stateMachine);
@@ -205,6 +202,11 @@ class RaftServerImpl implements RaftServer.Division,
     });
   }
 
+  @Override
+  public DivisionProperties properties() {
+    return divisionProperties;
+  }
+
   private RetryCache initRetryCache(RaftProperties prop) {
     final TimeDuration expireTime = RaftServerConfigKeys.RetryCache.expiryTime(prop);
     return new RetryCache(expireTime);
@@ -214,20 +216,13 @@ class RaftServerImpl implements RaftServer.Division,
     return getRaftServer().getFactory().newLogAppender(this, leaderState, f);
   }
 
-  int getMinTimeoutMs() {
-    return minTimeoutMs;
-  }
-
   int getMaxTimeoutMs() {
     return maxTimeoutMs;
   }
 
-  TimeDuration getRpcSlownessTimeout() {
-    return rpcSlownessTimeout;
-  }
-
   TimeDuration getRandomElectionTimeout() {
-    final long millis = minTimeoutMs + ThreadLocalRandom.current().nextInt(maxTimeoutMs - minTimeoutMs + 1);
+    final int min = properties().minRpcTimeoutMs();
+    final int millis = min + ThreadLocalRandom.current().nextInt(properties().maxRpcTimeoutMs() - min + 1);
     return TimeDuration.valueOf(millis, TimeUnit.MILLISECONDS);
   }
 
@@ -1599,7 +1594,8 @@ class RaftServerImpl implements RaftServer.Division,
     return leaderElectionMetrics;
   }
 
-  RaftServerMetrics getRaftServerMetrics() {
+  @Override
+  public RaftServerMetrics getRaftServerMetrics() {
     return raftServerMetrics;
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index df702bb..81b010c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -292,11 +292,6 @@ class RaftServerProxy implements RaftServer {
   }
 
   @Override
-  public RpcType getRpcType() {
-    return getFactory().getRpcType();
-  }
-
-  @Override
   public ServerFactory getFactory() {
     return factory;
   }
@@ -306,7 +301,8 @@ class RaftServerProxy implements RaftServer {
     return properties;
   }
 
-  RaftServerRpc getServerRpc() {
+  @Override
+  public RaftServerRpc getServerRpc() {
     return serverRpc;
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index a40e0d1..855f4a0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -429,7 +429,7 @@ class ServerState implements Closeable {
     latestInstalledSnapshot.set(lastTermIndexInSnapshot);
   }
 
-  SnapshotInfo getLatestSnapshot() {
+  private SnapshotInfo getLatestSnapshot() {
     return server.getStateMachine().getLatestSnapshot();
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java
index da635a5..24649d7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java
@@ -17,12 +17,19 @@
  */
 package org.apache.ratis.server.leader;
 
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.impl.LogAppender;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.util.JavaUtils;
 
+import java.util.List;
+
 /**
  * States for leader only.
  */
 public interface LeaderState {
+  /** The reasons that this leader steps down and becomes a follower. */
   enum StepDownReason {
     HIGHER_TERM, HIGHER_PRIORITY, LOST_MAJORITY_HEARTBEATS, STATE_MACHINE_EXCEPTION, JVM_PAUSE;
 
@@ -33,4 +40,23 @@ public interface LeaderState {
       return longName;
     }
   }
+
+  /** Restart the given {@link LogAppender}. */
+  void restart(LogAppender appender);
+
+  /** @return a new {@link AppendEntriesRequestProto} object. */
+  AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,
+      List<LogEntryProto> entries, TermIndex previous, long callId);
+
+  /** Check if the follower is healthy. */
+  void checkHealth(FollowerInfo follower);
+
+  /** Handle the event that the follower has replied a term. */
+  boolean onFollowerTerm(FollowerInfo follower, long followerTerm);
+
+  /** Handle the event that the follower has replied a commit index. */
+  void onFollowerCommitIndex(FollowerInfo follower, long commitIndex);
+
+  /** Handle the event that the follower has replied a success append entries. */
+  void onFollowerSuccessAppendEntries(FollowerInfo follower);
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index cf9307c..f216189 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -159,7 +159,7 @@ public class RaftServerTestUtil {
   public static void restartLogAppenders(RaftServer.Division server) {
     final LeaderStateImpl leaderState = getLeaderState(server).orElseThrow(
         () -> new IllegalStateException(server + " is not the leader"));
-    leaderState.getLogAppenders().forEach(leaderState::restartSender);
+    leaderState.getLogAppenders().forEach(leaderState::restart);
   }
 
   public static RaftServer.Division getDivision(RaftServer server, RaftGroupId groupId) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 8d94b9b..0533323 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -19,6 +19,8 @@ package org.apache.ratis.datastream;
 
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.server.DivisionInfo;
+import org.apache.ratis.server.DivisionProperties;
+import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.impl.ClientProtoUtils;
@@ -56,6 +58,7 @@ import org.apache.ratis.server.impl.DataStreamServerImpl;
 import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.StateMachine;
@@ -88,6 +91,11 @@ abstract class DataStreamBaseTest extends BaseTest {
     }
 
     @Override
+    public DivisionProperties properties() {
+      return null;
+    }
+
+    @Override
     public RaftGroupMemberId getMemberId() {
       return null;
     }
@@ -108,6 +116,11 @@ abstract class DataStreamBaseTest extends BaseTest {
     }
 
     @Override
+    public RaftServerMetrics getRaftServerMetrics() {
+      return null;
+    }
+
+    @Override
     public MultiDataStreamStateMachine getStateMachine() {
       return stateMachine;
     }
@@ -236,7 +249,7 @@ abstract class DataStreamBaseTest extends BaseTest {
       }
 
       @Override
-      public RpcType getRpcType() {
+      public RaftServerRpc getServerRpc() {
         return null;
       }