You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/07 05:34:21 UTC

[incubator-ratis] branch master updated: RATIS-1208. Separate LogAppender interface from the implementation. (#326)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new e5a052c  RATIS-1208. Separate LogAppender interface from the implementation. (#326)
e5a052c is described below

commit e5a052c18766293b94cc89e3a34a469193009537
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Dec 7 13:34:14 2020 +0800

    RATIS-1208. Separate LogAppender interface from the implementation. (#326)
    
    * RATIS-1208. Separate LogAppender interface from the implementation.
    
    * Fix a bug in SimpleStateMachine4Testing.
---
 .../java/org/apache/ratis/grpc/GrpcFactory.java    |   4 +-
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  42 +++---
 .../org/apache/ratis/hadooprpc/HadoopFactory.java  |   2 +-
 .../java/org/apache/ratis/netty/NettyFactory.java  |   2 +-
 .../java/org/apache/ratis/server/RaftServer.java   |   3 +-
 .../ratis/server/{impl => }/ServerFactory.java     |   8 +-
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  25 ++--
 .../{LogAppender.java => LogAppenderBase.java}     | 148 +++++++--------------
 .../ratis/server/impl/LogAppenderDaemon.java       |   3 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |   1 +
 .../apache/ratis/server/impl/RaftServerProxy.java  |   1 +
 .../apache/ratis/server/leader/LeaderState.java    |   1 -
 .../apache/ratis/server/leader/LogAppender.java    | 118 ++++++++++++++++
 .../java/org/apache/ratis/LogAppenderTests.java    |   2 +-
 .../apache/ratis/server/impl/MiniRaftCluster.java  |   1 +
 .../ratis/server/impl/RaftServerTestUtil.java      |   1 +
 .../ratis/server/simulation/SimulatedRpc.java      |   2 +-
 .../statemachine/SimpleStateMachine4Testing.java   |   2 +-
 .../ratis/datastream/DataStreamBaseTest.java       |   3 +-
 19 files changed, 223 insertions(+), 146 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
index c055848..3f33e44 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
@@ -26,8 +26,8 @@ import org.apache.ratis.grpc.server.GrpcService;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.LogAppender;
-import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.server.ServerFactory;
 import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.leader.LeaderState;
 import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 85d3572..26329db 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -24,9 +24,9 @@ import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.LogAppenderBase;
 import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.leader.LeaderState;
-import org.apache.ratis.server.impl.LogAppender;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
@@ -49,7 +49,7 @@ import com.codahale.metrics.Timer;
 /**
  * A new log appender implementation using grpc bi-directional stream API.
  */
-public class GrpcLogAppender extends LogAppender {
+public class GrpcLogAppender extends LogAppenderBase {
   public static final Logger LOG = LoggerFactory.getLogger(GrpcLogAppender.class);
 
   private final RequestMap pendingRequests = new RequestMap();
@@ -80,7 +80,7 @@ public class GrpcLogAppender extends LogAppender {
   }
 
   @Override
-  protected GrpcService getServerRpc() {
+  public GrpcService getServerRpc() {
     return (GrpcService)super.getServerRpc();
   }
 
@@ -114,10 +114,18 @@ public class GrpcLogAppender extends LogAppender {
     getFollower().decreaseNextIndex(nextIndex);
   }
 
+  private boolean haveLogEntriesToSendOut() {
+    return shouldAppendEntries(getFollower().getNextIndex());
+  }
+
+  private boolean isFollowerCommitBehindLastCommitIndex() {
+    return getRaftLog().getLastCommittedIndex() > getFollower().getCommitIndex();
+  }
+
   @Override
-  protected void runAppenderImpl() throws IOException {
+  public void run() throws IOException {
     boolean installSnapshotRequired;
-    for(; isAppenderRunning(); mayWait()) {
+    for(; isRunning(); mayWait()) {
       installSnapshotRequired = false;
 
       //HB period is expired OR we have messages OR follower is behind with commit index
@@ -174,13 +182,13 @@ public class GrpcLogAppender extends LogAppender {
   }
 
   @Override
-  public void stopAppender() {
+  public void stop() {
     grpcServerMetrics.unregister();
-    super.stopAppender();
+    super.stop();
   }
 
   @Override
-  protected boolean shouldSendRequest() {
+  public boolean shouldSendRequest() {
     return appendLogRequestObserver == null || super.shouldSendRequest();
   }
 
@@ -203,7 +211,7 @@ public class GrpcLogAppender extends LogAppender {
       // prepare and enqueue the append request. note changes on follower's
       // nextIndex and ops on pendingRequests should always be associated
       // together and protected by the lock
-      pending = createRequest(callId++, excludeLogEntries);
+      pending = newAppendEntriesRequest(callId++, excludeLogEntries);
       if (pending == null) {
         return;
       }
@@ -216,7 +224,7 @@ public class GrpcLogAppender extends LogAppender {
       s = appendLogRequestObserver;
     }
 
-    if (isAppenderRunning()) {
+    if (isRunning()) {
       sendRequest(request, pending, s);
     }
   }
@@ -316,7 +324,7 @@ public class GrpcLogAppender extends LogAppender {
         default:
           throw new IllegalStateException("Unexpected reply result: " + reply.getResult());
       }
-      notifyAppend();
+      notifyLogAppender();
     }
 
     /**
@@ -324,7 +332,7 @@ public class GrpcLogAppender extends LogAppender {
      */
     @Override
     public void onError(Throwable t) {
-      if (!isAppenderRunning()) {
+      if (!isRunning()) {
         LOG.info("{} is stopped", GrpcLogAppender.this);
         return;
       }
@@ -382,7 +390,7 @@ public class GrpcLogAppender extends LogAppender {
 
     void close() {
       done.set(true);
-      GrpcLogAppender.this.notifyAppend();
+      notifyLogAppender();
     }
 
     synchronized boolean hasAllResponse() {
@@ -439,7 +447,7 @@ public class GrpcLogAppender extends LogAppender {
 
     @Override
     public void onError(Throwable t) {
-      if (!isAppenderRunning()) {
+      if (!isRunning()) {
         LOG.info("{} is stopped", GrpcLogAppender.this);
         return;
       }
@@ -477,7 +485,7 @@ public class GrpcLogAppender extends LogAppender {
     try {
       snapshotRequestObserver = getClient().installSnapshot(responseHandler);
       for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
-        if (isAppenderRunning()) {
+        if (isRunning()) {
           snapshotRequestObserver.onNext(request);
           getFollower().updateLastRpcSendTime();
           responseHandler.addPending(request);
@@ -496,7 +504,7 @@ public class GrpcLogAppender extends LogAppender {
     }
 
     synchronized (this) {
-      while (isAppenderRunning() && !responseHandler.isDone()) {
+      while (isRunning() && !responseHandler.isDone()) {
         try {
           wait();
         } catch (InterruptedException ignored) {
@@ -541,7 +549,7 @@ public class GrpcLogAppender extends LogAppender {
     }
 
     synchronized (this) {
-      if (isAppenderRunning() && !responseHandler.isDone()) {
+      if (isRunning() && !responseHandler.isDone()) {
         try {
           wait();
         } catch (InterruptedException ignored) {
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
index f024b81..6cdfd36 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
@@ -26,7 +26,7 @@ import org.apache.ratis.hadooprpc.server.HadoopRpcService;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.server.ServerFactory;
 
 public class HadoopFactory implements ServerFactory, ClientFactory {
   public static Parameters newRaftParameters(Configuration conf) {
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
index 7f981ae..24f0142 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
@@ -25,7 +25,7 @@ import org.apache.ratis.netty.server.NettyRpcService;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.server.ServerFactory;
 
 public class NettyFactory implements ServerFactory, ClientFactory {
   public NettyFactory(Parameters parameters) {}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index b76705f..4ea4776 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -21,7 +21,6 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.*;
-import org.apache.ratis.rpc.RpcFactory;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.ServerImplUtils;
@@ -130,7 +129,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
   }
 
   /** @return the factory for creating server components. */
-  RpcFactory getFactory();
+  ServerFactory getFactory();
 
   /** Start this server. */
   void start() throws IOException;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/ServerFactory.java
similarity index 89%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
rename to ratis-server/src/main/java/org/apache/ratis/server/ServerFactory.java
index 12d5a74..600f02d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/ServerFactory.java
@@ -15,13 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.server.impl;
+package org.apache.ratis.server;
 
 import org.apache.ratis.rpc.RpcFactory;
-import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.LogAppenderBase;
 import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.leader.LeaderState;
+import org.apache.ratis.server.leader.LogAppender;
 
 /** A factory interface for creating server components. */
 public interface ServerFactory extends RpcFactory {
@@ -36,7 +36,7 @@ public interface ServerFactory extends RpcFactory {
 
   /** Create a new {@link LogAppender}. */
   default LogAppender newLogAppender(RaftServer.Division server, LeaderState state, FollowerInfo f) {
-    return new LogAppender(server, state, f);
+    return new LogAppenderBase(server, state, f);
   }
 
   RaftServerRpc newRaftServerRpc(RaftServer server);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 3d52e44..0352fb9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -35,6 +35,7 @@ import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.leader.LeaderState;
+import org.apache.ratis.server.leader.LogAppender;
 import org.apache.ratis.server.metrics.LogAppenderMetrics;
 import org.apache.ratis.server.metrics.RaftServerMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -284,7 +285,7 @@ class LeaderStateImpl implements LeaderState {
         server.getId().toString(), null);
     raftLog.append(placeHolder);
     processor.start();
-    senders.forEach(LogAppender::startAppender);
+    senders.forEach(LogAppender::start);
     return placeHolder;
   }
 
@@ -295,7 +296,7 @@ class LeaderStateImpl implements LeaderState {
   void stop() {
     this.running = false;
     // do not interrupt event processor since it may be in the middle of logSync
-    senders.forEach(LogAppender::stopAppender);
+    senders.forEach(LogAppender::stop);
     final NotLeaderException nle = server.generateNotLeaderException();
     final Collection<CommitInfoProto> commitInfos = server.getCommitInfos();
     try {
@@ -313,7 +314,7 @@ class LeaderStateImpl implements LeaderState {
   }
 
   void notifySenders() {
-    senders.forEach(LogAppender::notifyAppend);
+    senders.forEach(LogAppender::notifyLogAppender);
   }
 
   boolean inStagingState() {
@@ -466,7 +467,7 @@ class LeaderStateImpl implements LeaderState {
    * Update sender list for setConfiguration request
    */
   void addAndStartSenders(Collection<RaftPeer> newPeers) {
-    addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::startAppender);
+    addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::start);
   }
 
   Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote) {
@@ -486,7 +487,7 @@ class LeaderStateImpl implements LeaderState {
 
   void stopAndRemoveSenders(Predicate<LogAppender> predicate) {
     final List<LogAppender> toStop = senders.stream().filter(predicate).collect(Collectors.toList());
-    toStop.forEach(LogAppender::stopAppender);
+    toStop.forEach(LogAppender::stop);
     senders.removeAll(toStop);
   }
 
@@ -494,7 +495,7 @@ class LeaderStateImpl implements LeaderState {
   public void restart(LogAppender sender) {
     final FollowerInfo follower = sender.getFollower();
     LOG.info("{}: Restarting {} for {}", this, JavaUtils.getClassSimpleName(sender.getClass()), follower.getName());
-    sender.stopAppender();
+    sender.stop();
     senders.removeAll(Collections.singleton(sender));
     addAndStartSenders(Collections.singleton(follower.getPeer()));
   }
@@ -504,7 +505,7 @@ class LeaderStateImpl implements LeaderState {
    */
   private void updateSenders(RaftConfiguration conf) {
     Preconditions.assertTrue(conf.isStable() && !inStagingState());
-    stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollower().getPeer().getId()));
+    stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollowerId()));
   }
 
   void submitStepDownEvent(StepDownReason reason) {
@@ -845,14 +846,14 @@ class LeaderStateImpl implements LeaderState {
   private List<List<RaftPeerId>> divideFollowers(RaftConfiguration conf) {
     List<List<RaftPeerId>> lists = new ArrayList<>(2);
     List<RaftPeerId> listForNew = senders.stream()
-        .filter(sender -> conf.containsInConf(sender.getFollower().getPeer().getId()))
-        .map(sender -> sender.getFollower().getPeer().getId())
+        .map(LogAppender::getFollowerId)
+        .filter(conf::containsInConf)
         .collect(Collectors.toList());
     lists.add(listForNew);
     if (conf.isTransitional()) {
       List<RaftPeerId> listForOld = senders.stream()
-          .filter(sender -> conf.containsInOldConf(sender.getFollower().getPeer().getId()))
-          .map(sender -> sender.getFollower().getPeer().getId())
+          .map(LogAppender::getFollowerId)
+          .filter(conf::containsInOldConf)
           .collect(Collectors.toList());
       lists.add(listForOld);
     }
@@ -923,7 +924,7 @@ class LeaderStateImpl implements LeaderState {
         .filter(sender -> sender.getFollower()
                                 .getLastRpcResponseTime()
                                 .elapsedTimeMs() <= server.getMaxTimeoutMs())
-        .map(sender -> sender.getFollower().getPeer().getId())
+        .map(LogAppender::getFollowerId)
         .collect(Collectors.toList());
 
     final RaftConfiguration conf = server.getRaftConf();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderBase.java
similarity index 74%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
rename to ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderBase.java
index a922e00..6b045fd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderBase.java
@@ -18,25 +18,35 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.leader.LeaderState;
+import org.apache.ratis.server.leader.LogAppender;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLog.EntryWithData;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
-import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.statemachine.SnapshotInfo;
-import org.apache.ratis.util.*;
+import org.apache.ratis.util.DataQueue;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SizeInBytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.util.*;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
 
 import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
 import static org.apache.ratis.server.metrics.RaftLogMetrics.LOG_APPENDER_INSTALL_SNAPSHOT_METRIC;
@@ -44,31 +54,27 @@ import static org.apache.ratis.server.metrics.RaftLogMetrics.LOG_APPENDER_INSTAL
 /**
  * A daemon thread appending log entries to a follower peer.
  */
-public class LogAppender {
-  public static final Logger LOG = LoggerFactory.getLogger(LogAppender.class);
+public class LogAppenderBase implements LogAppender {
+  public static final Logger LOG = LoggerFactory.getLogger(LogAppenderBase.class);
 
   private final String name;
   private final RaftServer.Division server;
   private final LeaderState leaderState;
-  private final RaftLog raftLog;
   private final FollowerInfo follower;
 
   private final DataQueue<EntryWithData> buffer;
   private final int snapshotChunkMaxSize;
-  private final long halfMinTimeoutMs;
 
   private final LogAppenderDaemon daemon;
 
-  public LogAppender(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
+  public LogAppenderBase(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
     this.follower = f;
     this.name = follower.getName() + "-" + JavaUtils.getClassSimpleName(getClass());
     this.server = server;
     this.leaderState = leaderState;
-    this.raftLog = server.getRaftLog();
 
     final RaftProperties properties = server.getRaftServer().getProperties();
     this.snapshotChunkMaxSize = RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt();
-    this.halfMinTimeoutMs = server.properties().minRpcTimeoutMs() / 2;
 
     final SizeInBytes bufferByteLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
     final int bufferElementLimit = RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties);
@@ -76,44 +82,38 @@ public class LogAppender {
     this.daemon = new LogAppenderDaemon(this);
   }
 
-  protected RaftServer.Division getServer() {
+  @Override
+  public final RaftServer.Division getServer() {
     return server;
   }
 
-  protected RaftServerRpc getServerRpc() {
-    return server.getRaftServer().getServerRpc();
-  }
-
-  public RaftLog getRaftLog() {
-    return raftLog;
-  }
-
   @Override
   public String toString() {
     return name;
   }
 
-  void startAppender() {
+  @Override
+  public void start() {
     daemon.tryToStart();
   }
 
-  public boolean isAppenderRunning() {
+  @Override
+  public boolean isRunning() {
     return daemon.isWorking();
   }
 
-  public void stopAppender() {
+  @Override
+  public void stop() {
     daemon.tryToClose();
   }
 
-  public FollowerInfo getFollower() {
+  @Override
+  public final FollowerInfo getFollower() {
     return follower;
   }
 
-  protected RaftPeerId getFollowerId() {
-    return getFollower().getPeer().getId();
-  }
-
-  public LeaderState getLeaderState() {
+  @Override
+  public final LeaderState getLeaderState() {
     return leaderState;
   }
 
@@ -123,7 +123,7 @@ public class LogAppender {
     }
 
     final long previousIndex = nextIndex - 1;
-    final TermIndex previous = raftLog.getTermIndex(previousIndex);
+    final TermIndex previous = getRaftLog().getTermIndex(previousIndex);
     if (previous != null) {
       return previous;
     }
@@ -139,8 +139,8 @@ public class LogAppender {
     return null;
   }
 
-  protected AppendEntriesRequestProto createRequest(long callId,
-      boolean heartbeat) throws RaftLogIOException {
+  @Override
+  public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) throws RaftLogIOException {
     final TermIndex previous = getPrevious(follower.getNextIndex());
     final long snapshotIndex = follower.getSnapshotIndex();
     final long heartbeatRemainingMs = getHeartbeatRemainingTime();
@@ -151,11 +151,11 @@ public class LogAppender {
 
     Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + buffer.getNumElements() + " elements.");
 
-    final long leaderNext = raftLog.getNextIndex();
+    final long leaderNext = getRaftLog().getNextIndex();
     final long followerNext = follower.getNextIndex();
     final long halfMs = heartbeatRemainingMs/2;
     for (long next = followerNext; leaderNext > next && getHeartbeatRemainingTime() - halfMs > 0; ) {
-      if (!buffer.offer(raftLog.getEntryWithData(next++))) {
+      if (!buffer.offer(getRaftLog().getEntryWithData(next++))) {
         break;
       }
     }
@@ -195,16 +195,16 @@ public class LogAppender {
       throws InterruptedException, InterruptedIOException, RaftLogIOException {
     int retry = 0;
     AppendEntriesRequestProto request = null;
-    while (isAppenderRunning()) { // keep retrying for IOException
+    while (isRunning()) { // keep retrying for IOException
       try {
         if (request == null || request.getEntriesCount() == 0) {
-          request = createRequest(DEFAULT_CALLID, false);
+          request = newAppendEntriesRequest(DEFAULT_CALLID, false);
         }
 
         if (request == null) {
           LOG.trace("{} no entries to send now, wait ...", this);
           return null;
-        } else if (!isAppenderRunning()) {
+        } else if (!isRunning()) {
           LOG.info("{} is stopped. Skip appendEntries.", this);
           return null;
         }
@@ -224,14 +224,15 @@ public class LogAppender {
         }
         handleException(ioe);
       }
-      if (isAppenderRunning()) {
+      if (isRunning()) {
         server.properties().rpcSleepTime().sleep();
       }
     }
     return null;
   }
 
-  protected InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
+  @Override
+  public InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
     Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() > 0);
     synchronized (server) {
       return ServerProtoUtils.toInstallSnapshotRequestProto(server.getMemberId(), getFollowerId(),
@@ -239,8 +240,8 @@ public class LogAppender {
     }
   }
 
-  protected Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(
-      String requestId, SnapshotInfo snapshot) {
+  @Override
+  public Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot) {
     return ServerImplUtils.newInstallSnapshotRequests(server, getFollowerId(), requestId,
         snapshot, snapshotChunkMaxSize);
   }
@@ -274,29 +275,14 @@ public class LogAppender {
     return reply;
   }
 
-  protected SnapshotInfo shouldInstallSnapshot() {
-    final long logStartIndex = raftLog.getStartIndex();
-    // we should install snapshot if the follower needs to catch up and:
-    // 1. there is no local log entry but there is snapshot
-    // 2. or the follower's next index is smaller than the log start index
-    if (follower.getNextIndex() < raftLog.getNextIndex()) {
-      final SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
-      if (follower.getNextIndex() < logStartIndex ||
-          (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
-        return snapshot;
-      }
-    }
-    return null;
-  }
-
-  /** Check and send appendEntries RPC */
-  protected void runAppenderImpl() throws InterruptedException, IOException {
-    while (isAppenderRunning()) {
+  @Override
+  public void run() throws InterruptedException, IOException {
+    while (isRunning()) {
       if (shouldSendRequest()) {
         SnapshotInfo snapshot = shouldInstallSnapshot();
         if (snapshot != null) {
           LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, send snapshot {} to follower",
-              this, follower.getNextIndex(), raftLog.getStartIndex(), snapshot);
+              this, follower.getNextIndex(), getRaftLog().getStartIndex(), snapshot);
 
           final InstallSnapshotReplyProto r = installSnapshot(snapshot);
           if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
@@ -309,7 +295,7 @@ public class LogAppender {
           }
         }
       }
-      if (isAppenderRunning() && !shouldAppendEntries(follower.getNextIndex())) {
+      if (isRunning() && !shouldAppendEntries(follower.getNextIndex())) {
         final long waitTime = getHeartbeatRemainingTime();
         if (waitTime > 0) {
           synchronized (this) {
@@ -358,42 +344,4 @@ public class LogAppender {
     LOG.trace("TRACE", e);
     getServerRpc().handleException(getFollowerId(), e, false);
   }
-
-  public synchronized void notifyAppend() {
-    this.notify();
-  }
-
-  /** Should the leader send appendEntries RPC to this follower? */
-  protected boolean shouldSendRequest() {
-    return shouldAppendEntries(follower.getNextIndex()) || heartbeatTimeout();
-  }
-
-  protected boolean haveLogEntriesToSendOut() {
-    return shouldAppendEntries(follower.getNextIndex());
-  }
-
-  protected boolean isFollowerCommitBehindLastCommitIndex() {
-    return raftLog.getLastCommittedIndex() > follower.getCommitIndex();
-  }
-
-  private boolean shouldAppendEntries(long followerIndex) {
-    return followerIndex < raftLog.getNextIndex();
-  }
-
-  protected boolean heartbeatTimeout() {
-    return getHeartbeatRemainingTime() <= 0;
-  }
-
-  /**
-   * @return the time in milliseconds that the leader should send a heartbeat.
-   */
-  protected long getHeartbeatRemainingTime() {
-    return halfMinTimeoutMs - follower.getLastRpcTime().elapsedTimeMs();
-  }
-
-  protected boolean checkResponseTerm(long responseTerm) {
-    synchronized (server) {
-      return isAppenderRunning() && leaderState.onFollowerTerm(follower, responseTerm);
-    }
-  }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
index 4d0a662..4cecc9c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.server.leader.LogAppender;
 import org.apache.ratis.util.Daemon;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
@@ -74,7 +75,7 @@ class LogAppenderDaemon {
   private void run() {
     try {
       if (lifeCycle.transition(TRY_TO_RUN) == RUNNING) {
-        logAppender.runAppenderImpl();
+        logAppender.run();
       }
       lifeCycle.compareAndTransition(RUNNING, CLOSING);
     } catch (InterruptedException e) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 3bca7a0..254087f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -41,6 +41,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerMXBean;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.leader.LeaderState;
+import org.apache.ratis.server.leader.LogAppender;
 import org.apache.ratis.server.metrics.LeaderElectionMetrics;
 import org.apache.ratis.server.metrics.RaftServerMetrics;
 import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 834b90a..f285d57 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -35,6 +35,7 @@ import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.ServerFactory;
 import org.apache.ratis.util.JvmPauseMonitor;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java
index 24649d7..5460392 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java
@@ -19,7 +19,6 @@ package org.apache.ratis.server.leader;
 
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.server.impl.LogAppender;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.util.JavaUtils;
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppender.java
new file mode 100644
index 0000000..843e6ca
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.leader;
+
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A {@link LogAppender} is for the leader to send appendEntries to a particular follower.
+ */
+public interface LogAppender {
+  Logger LOG = LoggerFactory.getLogger(LogAppender.class);
+
+  RaftServer.Division getServer();
+
+  default RaftServerRpc getServerRpc() {
+    return getServer().getRaftServer().getServerRpc();
+  }
+
+  default RaftLog getRaftLog() {
+    return getServer().getRaftLog();
+  }
+
+  void start();
+
+  boolean isRunning();
+
+  void stop();
+
+  LeaderState getLeaderState();
+
+  FollowerInfo getFollower();
+
+  default RaftPeerId getFollowerId() {
+    return getFollower().getPeer().getId();
+  }
+
+  AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) throws RaftLogIOException;
+
+  InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex);
+
+  Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot);
+
+  default SnapshotInfo shouldInstallSnapshot() {
+    // we should install snapshot if the follower needs to catch up and:
+    // 1. there is no local log entry but there is snapshot
+    // 2. or the follower's next index is smaller than the log start index
+    final long followerNextIndex = getFollower().getNextIndex();
+    if (followerNextIndex < getRaftLog().getNextIndex()) {
+      final long logStartIndex = getRaftLog().getStartIndex();
+      final SnapshotInfo snapshot = getServer().getStateMachine().getLatestSnapshot();
+      if (followerNextIndex < logStartIndex || (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
+        return snapshot;
+      }
+    }
+    return null;
+  }
+
+  void run() throws InterruptedException, IOException;
+
+  default void notifyLogAppender() {
+    synchronized (this) {
+      notify();
+    }
+  }
+
+  /** Should the leader send appendEntries RPC to this follower? */
+  default boolean shouldSendRequest() {
+    return shouldAppendEntries(getFollower().getNextIndex()) || heartbeatTimeout();
+  }
+
+  default boolean shouldAppendEntries(long followerIndex) {
+    return followerIndex < getRaftLog().getNextIndex();
+  }
+
+  default boolean heartbeatTimeout() {
+    return getHeartbeatRemainingTime() <= 0;
+  }
+
+  /**
+   * @return the time in milliseconds that the leader should send a heartbeat.
+   */
+  default long getHeartbeatRemainingTime() {
+    return getServer().properties().minRpcTimeoutMs()/2 - getFollower().getLastRpcTime().elapsedTimeMs();
+  }
+
+  default boolean checkResponseTerm(long responseTerm) {
+    synchronized (getServer()) {
+      return isRunning() && getLeaderState().onFollowerTerm(getFollower(), responseTerm);
+    }
+  }
+}
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index ab37bba..f9dec53 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -30,7 +30,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.LogAppender;
+import org.apache.ratis.server.leader.LogAppender;
 import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.server.metrics.RaftServerMetrics;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 3a52585..2167f7e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -35,6 +35,7 @@ import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.ServerFactory;
 import org.apache.ratis.server.raftlog.memory.MemoryRaftLog;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.statemachine.StateMachine;
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index f216189..8e09b8d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -30,6 +30,7 @@ import org.apache.ratis.server.DataStreamMap;
 import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.leader.LogAppender;
 import org.apache.ratis.server.metrics.RaftServerMetrics;
 import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
 import org.apache.ratis.server.storage.RaftStorage;
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
index f5ee907..0399b41 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
@@ -23,7 +23,7 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.server.ServerFactory;
 import org.apache.ratis.util.JavaUtils;
 
 import java.util.Objects;
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index e25c9d2..6af26bf 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -374,7 +374,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
   @Override
   public CompletableFuture<ByteString> read(LogEntryProto entry) {
     return blocking.getFuture(Blocking.Type.READ_STATE_MACHINE_DATA)
-                   .thenApply((v) -> null);
+        .thenApply(v -> STATE_MACHINE_DATA);
   }
 
   @Override
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 0533323..da44b68 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -51,13 +51,12 @@ import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.SetConfigurationRequest;
-import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.DataStreamMap;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.DataStreamServerImpl;
 import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
-import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.server.ServerFactory;
 import org.apache.ratis.server.metrics.RaftServerMetrics;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.storage.RaftStorage;