You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/07 08:49:26 UTC

[incubator-ratis] branch master updated: RATIS-1214. Separate the default LogAppender implementation from LogAppenderBase. (#331)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 80d857b  RATIS-1214. Separate the default LogAppender implementation from LogAppenderBase. (#331)
80d857b is described below

commit 80d857b7418855c4653487e8fe7fe77dccbb3afd
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Dec 7 16:49:18 2020 +0800

    RATIS-1214. Separate the default LogAppender implementation from LogAppenderBase. (#331)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  27 ++-
 .../org/apache/ratis/server/ServerFactory.java     |   3 +-
 .../apache/ratis/server/impl/ServerImplUtils.java  |   8 -
 .../{impl => leader}/InstallSnapshotRequests.java  |   3 +-
 .../apache/ratis/server/leader/LogAppender.java    |  61 +++++--
 .../server/{impl => leader}/LogAppenderBase.java   | 174 +------------------
 .../server/{impl => leader}/LogAppenderDaemon.java |   3 +-
 .../ratis/server/leader/LogAppenderDefault.java    | 185 +++++++++++++++++++++
 8 files changed, 261 insertions(+), 203 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 26329db..579e1cb 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -24,10 +24,10 @@ import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.LogAppenderBase;
 import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.leader.LeaderState;
 import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.leader.LogAppenderBase;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
@@ -114,10 +114,6 @@ public class GrpcLogAppender extends LogAppenderBase {
     getFollower().decreaseNextIndex(nextIndex);
   }
 
-  private boolean haveLogEntriesToSendOut() {
-    return shouldAppendEntries(getFollower().getNextIndex());
-  }
-
   private boolean isFollowerCommitBehindLastCommitIndex() {
     return getRaftLog().getLastCommittedIndex() > getFollower().getCommitIndex();
   }
@@ -129,7 +125,7 @@ public class GrpcLogAppender extends LogAppenderBase {
       installSnapshotRequired = false;
 
       //HB period is expired OR we have messages OR follower is behind with commit index
-      if (haveLogEntriesToSendOut() || heartbeatTimeout() || isFollowerCommitBehindLastCommitIndex()) {
+      if (shouldSendAppendEntries() || isFollowerCommitBehindLastCommitIndex()) {
 
         if (installSnapshotEnabled) {
           SnapshotInfo snapshot = shouldInstallSnapshot();
@@ -156,11 +152,11 @@ public class GrpcLogAppender extends LogAppenderBase {
 
   private long getWaitTimeMs() {
     if (haveTooManyPendingRequests()) {
-      return getHeartbeatRemainingTime(); // Should wait for a short time
-    } else if (haveLogEntriesToSendOut() || heartbeatTimeout()) {
+      return getHeartbeatRemainingTimeMs(); // Should wait for a short time
+    } else if (shouldSendAppendEntries()) {
       return 0L;
     }
-    return Math.min(10L, getHeartbeatRemainingTime());
+    return Math.min(10L, getHeartbeatRemainingTimeMs());
   }
 
   private void mayWait() {
@@ -188,8 +184,8 @@ public class GrpcLogAppender extends LogAppenderBase {
   }
 
   @Override
-  public boolean shouldSendRequest() {
-    return appendLogRequestObserver == null || super.shouldSendRequest();
+  public boolean shouldSendAppendEntries() {
+    return appendLogRequestObserver == null || super.shouldSendAppendEntries();
   }
 
   /**
@@ -313,7 +309,7 @@ public class GrpcLogAppender extends LogAppenderBase {
           break;
         case NOT_LEADER:
           grpcServerMetrics.onRequestNotLeader(getFollowerId().toString());
-          if (checkResponseTerm(reply.getTerm())) {
+          if (onFollowerTerm(reply.getTerm())) {
             return;
           }
           break;
@@ -429,7 +425,7 @@ public class GrpcLogAppender extends LogAppenderBase {
           removePending(reply);
           break;
         case NOT_LEADER:
-          checkResponseTerm(reply.getTerm());
+          onFollowerTerm(reply.getTerm());
           break;
         case CONF_MISMATCH:
           LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}",
@@ -565,12 +561,13 @@ public class GrpcLogAppender extends LogAppenderBase {
    * @return the first available log's start term index
    */
   private TermIndex shouldNotifyToInstallSnapshot() {
-    if (getFollower().getNextIndex() < getRaftLog().getStartIndex()) {
+    final long leaderStartIndex = getRaftLog().getStartIndex();
+    if (getFollower().getNextIndex() < leaderStartIndex) {
       // The Leader does not have the logs from the Follower's last log
       // index onwards. And install snapshot is disabled. So the Follower
       // should be notified to install the latest snapshot through its
       // State Machine.
-      return getRaftLog().getTermIndex(getRaftLog().getStartIndex());
+      return getRaftLog().getTermIndex(leaderStartIndex);
     }
     return null;
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/ServerFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/ServerFactory.java
index 600f02d..b788bc9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/ServerFactory.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/ServerFactory.java
@@ -18,7 +18,6 @@
 package org.apache.ratis.server;
 
 import org.apache.ratis.rpc.RpcFactory;
-import org.apache.ratis.server.impl.LogAppenderBase;
 import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.leader.LeaderState;
 import org.apache.ratis.server.leader.LogAppender;
@@ -36,7 +35,7 @@ public interface ServerFactory extends RpcFactory {
 
   /** Create a new {@link LogAppender}. */
   default LogAppender newLogAppender(RaftServer.Division server, LeaderState state, FollowerInfo f) {
-    return new LogAppenderBase(server, state, f);
+    return LogAppender.newLogAppenderDefault(server, state, f);
   }
 
   RaftServerRpc newRaftServerRpc(RaftServer server);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 1117ceb..db452da 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -19,13 +19,11 @@ package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
@@ -68,12 +66,6 @@ public final class ServerImplUtils {
     return proxy;
   }
 
-  public static Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(RaftServer.Division server,
-      RaftPeerId targetId, String requestId, SnapshotInfo snapshot, int snapshotChunkMaxSize) {
-    return new InstallSnapshotRequests(server, targetId, requestId, snapshot, snapshotChunkMaxSize);
-  }
-
-
   static long effectiveCommitIndex(long leaderCommitIndex, TermIndex followerPrevious, int numAppendEntries) {
     final long p = Optional.ofNullable(followerPrevious).map(TermIndex::getIndex).orElse(RaftLog.LEAST_VALID_LOG_INDEX);
     return Math.min(leaderCommitIndex, p + numAppendEntries);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/InstallSnapshotRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
similarity index 98%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/InstallSnapshotRequests.java
rename to ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
index 448770a..31e32dc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/InstallSnapshotRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
@@ -15,12 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.server.impl;
+package org.apache.ratis.server.leader;
 
 import org.apache.ratis.proto.RaftProtos.FileChunkProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.storage.FileChunkReader;
 import org.apache.ratis.server.storage.FileInfo;
 import org.apache.ratis.statemachine.SnapshotInfo;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index 843e6ca..483ef33 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -37,36 +37,67 @@ import java.io.IOException;
 public interface LogAppender {
   Logger LOG = LoggerFactory.getLogger(LogAppender.class);
 
+  /** Create the default {@link LogAppender}. */
+  static LogAppender newLogAppenderDefault(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
+    return new LogAppenderDefault(server, leaderState, f);
+  }
+
+  /** @return the server. */
   RaftServer.Division getServer();
 
+  /** The same as getServer().getRaftServer().getServerRpc(). */
   default RaftServerRpc getServerRpc() {
     return getServer().getRaftServer().getServerRpc();
   }
 
+  /** The same as getServer().getRaftLog(). */
   default RaftLog getRaftLog() {
     return getServer().getRaftLog();
   }
 
+  /** Start this {@link LogAppender}. */
   void start();
 
+  /** Is this {@link LogAppender} running? */
   boolean isRunning();
 
+  /** Stop this {@link LogAppender}. */
   void stop();
 
+  /** @return the leader state. */
   LeaderState getLeaderState();
 
+  /** @return the follower information for this {@link LogAppender}. */
   FollowerInfo getFollower();
 
+  /** The same as getFollower().getPeer().getId(). */
   default RaftPeerId getFollowerId() {
     return getFollower().getPeer().getId();
   }
 
+  /**
+   * Create a {@link AppendEntriesRequestProto} object using the {@link FollowerInfo} of this {@link LogAppender}.
+   * The {@link AppendEntriesRequestProto} object may contain zero or more log entries.
+   * When there is zero log entries, the {@link AppendEntriesRequestProto} object is a heartbeat.
+   *
+   * @param callId The call id of the returned request.
+   * @param heartbeat the returned request must be a heartbeat.
+   *
+   * @return a new {@link AppendEntriesRequestProto} object.
+   */
   AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) throws RaftLogIOException;
 
+  /** @return a new {@link InstallSnapshotRequestProto} object. */
   InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex);
 
+  /** @return an {@link Iterable} of {@link InstallSnapshotRequestProto} for sending the given snapshot. */
   Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot);
 
+  /**
+   * Should this {@link LogAppender} send a snapshot to the follower?
+   *
+   * @return the snapshot if it should install a snapshot; otherwise, return null.
+   */
   default SnapshotInfo shouldInstallSnapshot() {
     // we should install snapshot if the follower needs to catch up and:
     // 1. there is no local log entry but there is snapshot
@@ -82,37 +113,47 @@ public interface LogAppender {
     return null;
   }
 
+  /** Define how this {@link LogAppender} should run. */
   void run() throws InterruptedException, IOException;
 
+  /**
+   * Similar to {@link #notify()}, wake up this {@link LogAppender} for an event, which can be:
+   * (1) new log entries available,
+   * (2) log indices changed, or
+   * (3) a snapshot installation completed.
+   */
   default void notifyLogAppender() {
     synchronized (this) {
       notify();
     }
   }
 
-  /** Should the leader send appendEntries RPC to this follower? */
-  default boolean shouldSendRequest() {
-    return shouldAppendEntries(getFollower().getNextIndex()) || heartbeatTimeout();
+  /** Should the leader send appendEntries RPC to the follower? */
+  default boolean shouldSendAppendEntries() {
+    return hasAppendEntries() || shouldHeartbeat();
   }
 
-  default boolean shouldAppendEntries(long followerIndex) {
-    return followerIndex < getRaftLog().getNextIndex();
+  /** Does it has outstanding appendEntries? */
+  default boolean hasAppendEntries() {
+    return getFollower().getNextIndex() < getRaftLog().getNextIndex();
   }
 
-  default boolean heartbeatTimeout() {
-    return getHeartbeatRemainingTime() <= 0;
+  /** The same as getHeartbeatRemainingTime() <= 0. */
+  default boolean shouldHeartbeat() {
+    return getHeartbeatRemainingTimeMs() <= 0;
   }
 
   /**
    * @return the time in milliseconds that the leader should send a heartbeat.
    */
-  default long getHeartbeatRemainingTime() {
+  default long getHeartbeatRemainingTimeMs() {
     return getServer().properties().minRpcTimeoutMs()/2 - getFollower().getLastRpcTime().elapsedTimeMs();
   }
 
-  default boolean checkResponseTerm(long responseTerm) {
+  /** Handle the event that the follower has replied a term. */
+  default boolean onFollowerTerm(long followerTerm) {
     synchronized (getServer()) {
-      return isRunning() && getLeaderState().onFollowerTerm(getFollower(), responseTerm);
+      return isRunning() && getLeaderState().onFollowerTerm(getFollower(), followerTerm);
     }
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
similarity index 51%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderBase.java
rename to ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index 6b045fd..43a84c0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -15,20 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.server.impl;
+package org.apache.ratis.server.leader;
 
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.leader.FollowerInfo;
-import org.apache.ratis.server.leader.LeaderState;
-import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLog.EntryWithData;
@@ -38,25 +33,15 @@ import org.apache.ratis.util.DataQueue;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.SizeInBytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
-import java.util.UUID;
-
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
-import static org.apache.ratis.server.metrics.RaftLogMetrics.LOG_APPENDER_INSTALL_SNAPSHOT_METRIC;
 
 /**
- * A daemon thread appending log entries to a follower peer.
+ * An abstract implementation of {@link LogAppender}.
  */
-public class LogAppenderBase implements LogAppender {
-  public static final Logger LOG = LoggerFactory.getLogger(LogAppenderBase.class);
-
+public abstract class LogAppenderBase implements LogAppender {
   private final String name;
   private final RaftServer.Division server;
   private final LeaderState leaderState;
@@ -67,7 +52,7 @@ public class LogAppenderBase implements LogAppender {
 
   private final LogAppenderDaemon daemon;
 
-  public LogAppenderBase(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
+  protected LogAppenderBase(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
     this.follower = f;
     this.name = follower.getName() + "-" + JavaUtils.getClassSimpleName(getClass());
     this.server = server;
@@ -143,7 +128,7 @@ public class LogAppenderBase implements LogAppender {
   public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) throws RaftLogIOException {
     final TermIndex previous = getPrevious(follower.getNextIndex());
     final long snapshotIndex = follower.getSnapshotIndex();
-    final long heartbeatRemainingMs = getHeartbeatRemainingTime();
+    final long heartbeatRemainingMs = getHeartbeatRemainingTimeMs();
     if (heartbeatRemainingMs <= 0L || heartbeat) {
       // heartbeat
       return leaderState.newAppendEntriesRequestProto(follower, Collections.emptyList(), previous, callId);
@@ -154,7 +139,7 @@ public class LogAppenderBase implements LogAppender {
     final long leaderNext = getRaftLog().getNextIndex();
     final long followerNext = follower.getNextIndex();
     final long halfMs = heartbeatRemainingMs/2;
-    for (long next = followerNext; leaderNext > next && getHeartbeatRemainingTime() - halfMs > 0; ) {
+    for (long next = followerNext; leaderNext > next && getHeartbeatRemainingTimeMs() - halfMs > 0; ) {
       if (!buffer.offer(getRaftLog().getEntryWithData(next++))) {
         break;
       }
@@ -163,7 +148,7 @@ public class LogAppenderBase implements LogAppender {
       return null;
     }
 
-    final List<LogEntryProto> protos = buffer.pollList(getHeartbeatRemainingTime(), EntryWithData::getEntry,
+    final List<LogEntryProto> protos = buffer.pollList(getHeartbeatRemainingTimeMs(), EntryWithData::getEntry,
         (entry, time, exception) -> LOG.warn("{}: Failed to get {} in {}: {}",
             follower.getName(), entry, time, exception));
     buffer.clear();
@@ -190,47 +175,6 @@ public class LogAppenderBase implements LogAppender {
     }
   }
 
-  /** Send an appendEntries RPC; retry indefinitely. */
-  private AppendEntriesReplyProto sendAppendEntriesWithRetries()
-      throws InterruptedException, InterruptedIOException, RaftLogIOException {
-    int retry = 0;
-    AppendEntriesRequestProto request = null;
-    while (isRunning()) { // keep retrying for IOException
-      try {
-        if (request == null || request.getEntriesCount() == 0) {
-          request = newAppendEntriesRequest(DEFAULT_CALLID, false);
-        }
-
-        if (request == null) {
-          LOG.trace("{} no entries to send now, wait ...", this);
-          return null;
-        } else if (!isRunning()) {
-          LOG.info("{} is stopped. Skip appendEntries.", this);
-          return null;
-        }
-
-        follower.updateLastRpcSendTime();
-        final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
-        follower.updateLastRpcResponseTime();
-
-        getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit());
-        return r;
-      } catch (InterruptedIOException | RaftLogIOException e) {
-        throw e;
-      } catch (IOException ioe) {
-        // TODO should have more detailed retry policy here.
-        if (retry++ % 10 == 0) { // to reduce the number of messages
-          LOG.warn("{}: Failed to appendEntries (retry={}): {}", this, retry++, ioe);
-        }
-        handleException(ioe);
-      }
-      if (isRunning()) {
-        server.properties().rpcSleepTime().sleep();
-      }
-    }
-    return null;
-  }
-
   @Override
   public InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
     Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() > 0);
@@ -242,106 +186,6 @@ public class LogAppenderBase implements LogAppender {
 
   @Override
   public Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot) {
-    return ServerImplUtils.newInstallSnapshotRequests(server, getFollowerId(), requestId,
-        snapshot, snapshotChunkMaxSize);
-  }
-
-  private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws InterruptedIOException {
-    String requestId = UUID.randomUUID().toString();
-    InstallSnapshotReplyProto reply = null;
-    try {
-      for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
-        follower.updateLastRpcSendTime();
-        reply = getServerRpc().installSnapshot(request);
-        follower.updateLastRpcResponseTime();
-
-        if (!reply.getServerReply().getSuccess()) {
-          return reply;
-        }
-      }
-    } catch (InterruptedIOException iioe) {
-      throw iioe;
-    } catch (Exception ioe) {
-      LOG.warn("{}: Failed to installSnapshot {}: {}", this, snapshot, ioe);
-      handleException(ioe);
-      return null;
-    }
-
-    if (reply != null) {
-      follower.setSnapshotIndex(snapshot.getTermIndex().getIndex());
-      LOG.info("{}: installSnapshot {} successfully", this, snapshot);
-      server.getRaftServerMetrics().getCounter(LOG_APPENDER_INSTALL_SNAPSHOT_METRIC).inc();
-    }
-    return reply;
-  }
-
-  @Override
-  public void run() throws InterruptedException, IOException {
-    while (isRunning()) {
-      if (shouldSendRequest()) {
-        SnapshotInfo snapshot = shouldInstallSnapshot();
-        if (snapshot != null) {
-          LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, send snapshot {} to follower",
-              this, follower.getNextIndex(), getRaftLog().getStartIndex(), snapshot);
-
-          final InstallSnapshotReplyProto r = installSnapshot(snapshot);
-          if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
-            checkResponseTerm(r.getTerm());
-          } // otherwise if r is null, retry the snapshot installation
-        } else {
-          final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
-          if (r != null) {
-            handleReply(r);
-          }
-        }
-      }
-      if (isRunning() && !shouldAppendEntries(follower.getNextIndex())) {
-        final long waitTime = getHeartbeatRemainingTime();
-        if (waitTime > 0) {
-          synchronized (this) {
-            wait(waitTime);
-          }
-        }
-      }
-      getLeaderState().checkHealth(getFollower());
-    }
-  }
-
-  private void handleReply(AppendEntriesReplyProto reply) throws IllegalArgumentException {
-    if (reply != null) {
-      switch (reply.getResult()) {
-        case SUCCESS:
-          final long oldNextIndex = follower.getNextIndex();
-          final long nextIndex = reply.getNextIndex();
-          if (nextIndex < oldNextIndex) {
-            throw new IllegalStateException("nextIndex=" + nextIndex
-                + " < oldNextIndex=" + oldNextIndex
-                + ", reply=" + ServerProtoUtils.toString(reply));
-          }
-
-          if (nextIndex > oldNextIndex) {
-            follower.updateMatchIndex(nextIndex - 1);
-            follower.increaseNextIndex(nextIndex);
-            getLeaderState().onFollowerSuccessAppendEntries(getFollower());
-          }
-          break;
-        case NOT_LEADER:
-          // check if should step down
-          checkResponseTerm(reply.getTerm());
-          break;
-        case INCONSISTENCY:
-          follower.decreaseNextIndex(reply.getNextIndex());
-          break;
-        case UNRECOGNIZED:
-          LOG.warn("{}: received {}", this, reply.getResult());
-          break;
-        default: throw new IllegalArgumentException("Unable to process result " + reply.getResult());
-      }
-    }
-  }
-
-  private void handleException(Exception e) {
-    LOG.trace("TRACE", e);
-    getServerRpc().handleException(getFollowerId(), e, false);
+    return new InstallSnapshotRequests(server, getFollowerId(), requestId, snapshot, snapshotChunkMaxSize);
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
similarity index 97%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
rename to ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
index 4cecc9c..6b2d607 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
@@ -15,9 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.server.impl;
+package org.apache.ratis.server.leader;
 
-import org.apache.ratis.server.leader.LogAppender;
 import org.apache.ratis.util.Daemon;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
new file mode 100644
index 0000000..5eb85d8
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.leader;
+
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
+import org.apache.ratis.statemachine.SnapshotInfo;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.UUID;
+
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.LOG_APPENDER_INSTALL_SNAPSHOT_METRIC;
+
+/**
+ * The default implementation of {@link LogAppender}
+ * using {@link org.apache.ratis.server.protocol.RaftServerProtocol}.
+ */
+class LogAppenderDefault extends LogAppenderBase {
+  LogAppenderDefault(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
+    super(server, leaderState, f);
+  }
+
+  /** Send an appendEntries RPC; retry indefinitely. */
+  private AppendEntriesReplyProto sendAppendEntriesWithRetries()
+      throws InterruptedException, InterruptedIOException, RaftLogIOException {
+    int retry = 0;
+    AppendEntriesRequestProto request = null;
+    while (isRunning()) { // keep retrying for IOException
+      try {
+        if (request == null || request.getEntriesCount() == 0) {
+          request = newAppendEntriesRequest(DEFAULT_CALLID, false);
+        }
+
+        if (request == null) {
+          LOG.trace("{} no entries to send now, wait ...", this);
+          return null;
+        } else if (!isRunning()) {
+          LOG.info("{} is stopped. Skip appendEntries.", this);
+          return null;
+        }
+
+        getFollower().updateLastRpcSendTime();
+        final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
+        getFollower().updateLastRpcResponseTime();
+
+        getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit());
+        return r;
+      } catch (InterruptedIOException | RaftLogIOException e) {
+        throw e;
+      } catch (IOException ioe) {
+        // TODO should have more detailed retry policy here.
+        if (retry++ % 10 == 0) { // to reduce the number of messages
+          LOG.warn("{}: Failed to appendEntries (retry={}): {}", this, retry++, ioe);
+        }
+        handleException(ioe);
+      }
+      if (isRunning()) {
+        getServer().properties().rpcSleepTime().sleep();
+      }
+    }
+    return null;
+  }
+
+  private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws InterruptedIOException {
+    String requestId = UUID.randomUUID().toString();
+    InstallSnapshotReplyProto reply = null;
+    try {
+      for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
+        getFollower().updateLastRpcSendTime();
+        reply = getServerRpc().installSnapshot(request);
+        getFollower().updateLastRpcResponseTime();
+
+        if (!reply.getServerReply().getSuccess()) {
+          return reply;
+        }
+      }
+    } catch (InterruptedIOException iioe) {
+      throw iioe;
+    } catch (Exception ioe) {
+      LOG.warn("{}: Failed to installSnapshot {}: {}", this, snapshot, ioe);
+      handleException(ioe);
+      return null;
+    }
+
+    if (reply != null) {
+      getFollower().setSnapshotIndex(snapshot.getTermIndex().getIndex());
+      LOG.info("{}: installSnapshot {} successfully", this, snapshot);
+      getServer().getRaftServerMetrics().getCounter(LOG_APPENDER_INSTALL_SNAPSHOT_METRIC).inc();
+    }
+    return reply;
+  }
+
+  @Override
+  public void run() throws InterruptedException, IOException {
+    while (isRunning()) {
+      if (shouldSendAppendEntries()) {
+        SnapshotInfo snapshot = shouldInstallSnapshot();
+        if (snapshot != null) {
+          LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, send snapshot {} to follower",
+              this, getFollower().getNextIndex(), getRaftLog().getStartIndex(), snapshot);
+
+          final InstallSnapshotReplyProto r = installSnapshot(snapshot);
+          if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
+            onFollowerTerm(r.getTerm());
+          } // otherwise if r is null, retry the snapshot installation
+        } else {
+          final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
+          if (r != null) {
+            handleReply(r);
+          }
+        }
+      }
+      if (isRunning() && !hasAppendEntries()) {
+        final long waitTime = getHeartbeatRemainingTimeMs();
+        if (waitTime > 0) {
+          synchronized (this) {
+            wait(waitTime);
+          }
+        }
+      }
+      getLeaderState().checkHealth(getFollower());
+    }
+  }
+
+  private void handleReply(AppendEntriesReplyProto reply) throws IllegalArgumentException {
+    if (reply != null) {
+      switch (reply.getResult()) {
+        case SUCCESS:
+          final long oldNextIndex = getFollower().getNextIndex();
+          final long nextIndex = reply.getNextIndex();
+          if (nextIndex < oldNextIndex) {
+            throw new IllegalStateException("nextIndex=" + nextIndex
+                + " < oldNextIndex=" + oldNextIndex
+                + ", reply=" + ServerProtoUtils.toString(reply));
+          }
+
+          if (nextIndex > oldNextIndex) {
+            getFollower().updateMatchIndex(nextIndex - 1);
+            getFollower().increaseNextIndex(nextIndex);
+            getLeaderState().onFollowerSuccessAppendEntries(getFollower());
+          }
+          break;
+        case NOT_LEADER:
+          // check if should step down
+          onFollowerTerm(reply.getTerm());
+          break;
+        case INCONSISTENCY:
+          getFollower().decreaseNextIndex(reply.getNextIndex());
+          break;
+        case UNRECOGNIZED:
+          LOG.warn("{}: received {}", this, reply.getResult());
+          break;
+        default: throw new IllegalArgumentException("Unable to process result " + reply.getResult());
+      }
+    }
+  }
+
+  private void handleException(Exception e) {
+    LOG.trace("TRACE", e);
+    getServerRpc().handleException(getFollowerId(), e, false);
+  }
+}