You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/01/31 21:17:06 UTC

[21/54] [abbrv] incubator-ratis git commit: Renamed the packages from raft to ratis in preperation for Apache Incubation - Moved all java packages from org.apache.raft to org.apache.ratis. - Moved native package to org_apache_ratis, and native lib to l

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
new file mode 100644
index 0000000..7b3845a
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.Timestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+class LeaderElection extends Daemon {
+  public static final Logger LOG = LoggerFactory.getLogger(LeaderElection.class);
+
+  private ResultAndTerm logAndReturn(Result result,
+      List<RequestVoteReplyProto> responses,
+      List<Exception> exceptions, long newTerm) {
+    LOG.info(server.getId() + ": Election " + result + "; received "
+        + responses.size() + " response(s) "
+        + responses.stream().map(r -> ProtoUtils.toString(r)).collect(Collectors.toList())
+        + " and " + exceptions.size() + " exception(s); " + server.getState());
+    int i = 0;
+    for(Exception e : exceptions) {
+      LOG.info("  " + i++ + ": " + e);
+      LOG.trace("TRACE", e);
+    }
+    return new ResultAndTerm(result, newTerm);
+  }
+
+  enum Result {PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN}
+
+  private static class ResultAndTerm {
+    final Result result;
+    final long term;
+
+    ResultAndTerm(Result result, long term) {
+      this.result = result;
+      this.term = term;
+    }
+  }
+
+  private final RaftServerImpl server;
+  private ExecutorCompletionService<RequestVoteReplyProto> service;
+  private ExecutorService executor;
+  private volatile boolean running;
+  /**
+   * The Raft configuration should not change while the peer is in candidate
+   * state. If the configuration changes, another peer should be acting as a
+   * leader and this LeaderElection session should end.
+   */
+  private final RaftConfiguration conf;
+  private final Collection<RaftPeer> others;
+
+  LeaderElection(RaftServerImpl server) {
+    this.server = server;
+    conf = server.getRaftConf();
+    others = conf.getOtherPeers(server.getId());
+    this.running = true;
+  }
+
+  void stopRunning() {
+    this.running = false;
+  }
+
+  private void initExecutor() {
+    Preconditions.checkState(!others.isEmpty());
+    executor = Executors.newFixedThreadPool(others.size(),
+        new ThreadFactoryBuilder().setDaemon(true).build());
+    service = new ExecutorCompletionService<>(executor);
+  }
+
+  @Override
+  public void run() {
+    try {
+      askForVotes();
+    } catch (InterruptedException e) {
+      // the leader election thread is interrupted. The peer may already step
+      // down to a follower. The leader election should skip.
+      LOG.info(server.getId() + " " + getClass().getSimpleName()
+          + " thread is interrupted gracefully; server=" + server);
+    } catch (IOException e) {
+      LOG.warn("Failed to persist votedFor/term. Exit the leader election.", e);
+      stopRunning();
+    }
+  }
+
+  /**
+   * After a peer changes its role to candidate, it invokes this method to
+   * send out requestVote rpc to all other peers.
+   */
+  private void askForVotes() throws InterruptedException, IOException {
+    final ServerState state = server.getState();
+    while (running && server.isCandidate()) {
+      // one round of requestVotes
+      final long electionTerm;
+      synchronized (server) {
+        electionTerm = state.initElection();
+        server.getState().persistMetadata();
+      }
+      LOG.info(state.getSelfId() + ": begin an election in Term "
+          + electionTerm);
+
+      TermIndex lastEntry = ServerProtoUtils.toTermIndex(
+          state.getLog().getLastEntry());
+      if (lastEntry == null) {
+        // lastEntry may need to be derived from snapshot
+        SnapshotInfo snapshot = state.getLatestSnapshot();
+        if (snapshot != null) {
+          lastEntry = snapshot.getTermIndex();
+        }
+      }
+
+      final ResultAndTerm r;
+      if (others.isEmpty()) {
+        r = new ResultAndTerm(Result.PASSED, electionTerm);
+      } else {
+        try {
+          initExecutor();
+          int submitted = submitRequests(electionTerm, lastEntry);
+          r = waitForResults(electionTerm, submitted);
+        } finally {
+          if (executor != null) {
+            executor.shutdown();
+          }
+        }
+      }
+
+      synchronized (server) {
+        if (electionTerm != state.getCurrentTerm() || !running ||
+            !server.isCandidate()) {
+          return; // term already passed or no longer a candidate.
+        }
+
+        switch (r.result) {
+          case PASSED:
+            server.changeToLeader();
+            return;
+          case SHUTDOWN:
+            LOG.info("{} received shutdown response when requesting votes.",
+                server.getId());
+            server.close();
+            return;
+          case REJECTED:
+          case DISCOVERED_A_NEW_TERM:
+            final long term = r.term > server.getState().getCurrentTerm() ?
+                r.term : server.getState().getCurrentTerm();
+            server.changeToFollower(term, true);
+            return;
+          case TIMEOUT:
+            // should start another election
+        }
+      }
+    }
+  }
+
+  private int submitRequests(final long electionTerm, final TermIndex lastEntry) {
+    int submitted = 0;
+    for (final RaftPeer peer : others) {
+      final RequestVoteRequestProto r = server.createRequestVoteRequest(
+          peer.getId(), electionTerm, lastEntry);
+      service.submit(
+          () -> server.getServerRpc().requestVote(r));
+      submitted++;
+    }
+    return submitted;
+  }
+
+  private ResultAndTerm waitForResults(final long electionTerm,
+      final int submitted) throws InterruptedException {
+    final Timestamp timeout = new Timestamp().addTimeMs(server.getRandomTimeoutMs());
+    final List<RequestVoteReplyProto> responses = new ArrayList<>();
+    final List<Exception> exceptions = new ArrayList<>();
+    int waitForNum = submitted;
+    Collection<String> votedPeers = new ArrayList<>();
+    while (waitForNum > 0 && running && server.isCandidate()) {
+      final long waitTime = -timeout.elapsedTimeMs();
+      if (waitTime <= 0) {
+        return logAndReturn(Result.TIMEOUT, responses, exceptions, -1);
+      }
+
+      try {
+        final Future<RequestVoteReplyProto> future = service.poll(
+            waitTime, TimeUnit.MILLISECONDS);
+        if (future == null) {
+          continue; // poll timeout, continue to return Result.TIMEOUT
+        }
+
+        final RequestVoteReplyProto r = future.get();
+        responses.add(r);
+        if (r.getShouldShutdown()) {
+          return logAndReturn(Result.SHUTDOWN, responses, exceptions, -1);
+        }
+        if (r.getTerm() > electionTerm) {
+          return logAndReturn(Result.DISCOVERED_A_NEW_TERM, responses,
+              exceptions, r.getTerm());
+        }
+        if (r.getServerReply().getSuccess()) {
+          votedPeers.add(r.getServerReply().getReplyId());
+          if (conf.hasMajority(votedPeers, server.getId())) {
+            return logAndReturn(Result.PASSED, responses, exceptions, -1);
+          }
+        }
+      } catch(ExecutionException e) {
+        LOG.info("Got exception when requesting votes: " + e);
+        LOG.trace("TRACE", e);
+        exceptions.add(e);
+      }
+      waitForNum--;
+    }
+    // received all the responses
+    return logAndReturn(Result.REJECTED, responses, exceptions, -1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
new file mode 100644
index 0000000..e4b2889
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -0,0 +1,601 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_RPC_SLEEP_TIME_MS_DEFAULT;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_RPC_SLEEP_TIME_MS_KEY;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_KEY;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SNAPSHOT_CHUNK_MAX_SIZE_DEFAULT;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SNAPSHOT_CHUNK_MAX_SIZE_KEY;
+import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.STAGINGPROGRESS;
+import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.STEPDOWN;
+import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.UPDATECOMMIT;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.ReconfigurationTimeoutException;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.shaded.proto.RaftProtos.LeaderNoOp;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.Timestamp;
+import org.slf4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * States for leader only. It contains three different types of processors:
+ * 1. RPC senders: each thread is appending log to a follower
+ * 2. EventProcessor: a single thread updating the raft server's state based on
+ *                    status of log appending response
+ * 3. PendingRequestHandler: a handler sending back responses to clients when
+ *                           corresponding log entries are committed
+ */
+public class LeaderState {
+  private static final Logger LOG = RaftServerImpl.LOG;
+
+  enum StateUpdateEventType {
+    STEPDOWN, UPDATECOMMIT, STAGINGPROGRESS
+  }
+
+  enum BootStrapProgress {
+    NOPROGRESS, PROGRESSING, CAUGHTUP
+  }
+
+  static class StateUpdateEvent {
+    final StateUpdateEventType type;
+    final long newTerm;
+
+    StateUpdateEvent(StateUpdateEventType type, long newTerm) {
+      this.type = type;
+      this.newTerm = newTerm;
+    }
+  }
+
+  static final StateUpdateEvent UPDATE_COMMIT_EVENT =
+      new StateUpdateEvent(StateUpdateEventType.UPDATECOMMIT, -1);
+  static final StateUpdateEvent STAGING_PROGRESS_EVENT =
+      new StateUpdateEvent(StateUpdateEventType.STAGINGPROGRESS, -1);
+
+  private final RaftServerImpl server;
+  private final RaftLog raftLog;
+  private final long currentTerm;
+  private volatile ConfigurationStagingState stagingState;
+  private List<List<FollowerInfo>> voterLists;
+
+  /**
+   * The list of threads appending entries to followers.
+   * The list is protected by the RaftServer's lock.
+   */
+  private final List<LogAppender> senders;
+  private final BlockingQueue<StateUpdateEvent> eventQ;
+  private final EventProcessor processor;
+  private final PendingRequests pendingRequests;
+  private volatile boolean running = true;
+
+  private final int stagingCatchupGap;
+  private final int snapshotChunkMaxSize;
+  private final int syncInterval;
+
+  LeaderState(RaftServerImpl server, RaftProperties properties) {
+    this.server = server;
+
+    stagingCatchupGap = properties.getInt(
+        RAFT_SERVER_STAGING_CATCHUP_GAP_KEY,
+        RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT);
+    snapshotChunkMaxSize = properties.getInt(
+        RAFT_SNAPSHOT_CHUNK_MAX_SIZE_KEY,
+        RAFT_SNAPSHOT_CHUNK_MAX_SIZE_DEFAULT);
+    syncInterval = properties.getInt(
+        RAFT_SERVER_RPC_SLEEP_TIME_MS_KEY,
+        RAFT_SERVER_RPC_SLEEP_TIME_MS_DEFAULT);
+
+    final ServerState state = server.getState();
+    this.raftLog = state.getLog();
+    this.currentTerm = state.getCurrentTerm();
+    eventQ = new ArrayBlockingQueue<>(4096);
+    processor = new EventProcessor();
+    pendingRequests = new PendingRequests(server);
+
+    final RaftConfiguration conf = server.getRaftConf();
+    Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId());
+    final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
+    final long nextIndex = raftLog.getNextIndex();
+    senders = new ArrayList<>(others.size());
+    for (RaftPeer p : others) {
+      FollowerInfo f = new FollowerInfo(p, t, nextIndex, true);
+      senders.add(server.getLogAppenderFactory().getLogAppender(server, this, f));
+    }
+    voterLists = divideFollowers(conf);
+  }
+
+  void start() {
+    // In the beginning of the new term, replicate an empty entry in order
+    // to finally commit entries in the previous term.
+    // Also this message can help identify the last committed index when
+    // the leader peer is just started.
+    final LogEntryProto placeHolder = LogEntryProto.newBuilder()
+        .setTerm(server.getState().getCurrentTerm())
+        .setIndex(raftLog.getNextIndex())
+        .setNoOp(LeaderNoOp.newBuilder()).build();
+    raftLog.append(placeHolder);
+
+    processor.start();
+    startSenders();
+  }
+
+  private void startSenders() {
+    senders.forEach(Thread::start);
+  }
+
+  void stop() {
+    this.running = false;
+    // do not interrupt event processor since it may be in the middle of logSync
+    for (LogAppender sender : senders) {
+      sender.stopSender();
+      sender.interrupt();
+    }
+    try {
+      pendingRequests.sendNotLeaderResponses();
+    } catch (IOException e) {
+      LOG.warn("Caught exception in sendNotLeaderResponses", e);
+    }
+  }
+
+  void notifySenders() {
+    senders.forEach(LogAppender::notifyAppend);
+  }
+
+  boolean inStagingState() {
+    return stagingState != null;
+  }
+
+  ConfigurationStagingState getStagingState() {
+    return stagingState;
+  }
+
+  long getCurrentTerm() {
+    return currentTerm;
+  }
+
+  int getSnapshotChunkMaxSize() {
+    return snapshotChunkMaxSize;
+  }
+
+  int getSyncInterval() {
+    return syncInterval;
+  }
+
+  /**
+   * Start bootstrapping new peers
+   */
+  PendingRequest startSetConfiguration(SetConfigurationRequest request) {
+    Preconditions.checkState(running && !inStagingState());
+
+    RaftPeer[] peersInNewConf = request.getPeersInNewConf();
+    Collection<RaftPeer> peersToBootStrap = RaftConfiguration
+        .computeNewPeers(peersInNewConf, server.getRaftConf());
+
+    // add the request to the pending queue
+    final PendingRequest pending = pendingRequests.addConfRequest(request);
+
+    ConfigurationStagingState stagingState = new ConfigurationStagingState(
+        peersToBootStrap, new PeerConfiguration(Arrays.asList(peersInNewConf)));
+    Collection<RaftPeer> newPeers = stagingState.getNewPeers();
+    // set the staging state
+    this.stagingState = stagingState;
+
+    if (newPeers.isEmpty()) {
+      applyOldNewConf();
+    } else {
+      // update the LeaderState's sender list
+      addSenders(newPeers);
+    }
+    return pending;
+  }
+
+  PendingRequest addPendingRequest(long index, RaftClientRequest request,
+      TransactionContext entry) {
+    return pendingRequests.addPendingRequest(index, request, entry);
+  }
+
+  private void applyOldNewConf() {
+    final ServerState state = server.getState();
+    final RaftConfiguration current = server.getRaftConf();
+    final RaftConfiguration oldNewConf= stagingState.generateOldNewConf(current,
+        state.getLog().getNextIndex());
+    // apply the (old, new) configuration to log, and use it as the current conf
+    long index = state.getLog().append(state.getCurrentTerm(), oldNewConf);
+    updateConfiguration(index, oldNewConf);
+
+    this.stagingState = null;
+    notifySenders();
+  }
+
+  private void updateConfiguration(long logIndex, RaftConfiguration newConf) {
+    voterLists = divideFollowers(newConf);
+    server.getState().setRaftConf(logIndex, newConf);
+  }
+
+  /**
+   * After receiving a setConfiguration request, the leader should update its
+   * RpcSender list.
+   */
+  void addSenders(Collection<RaftPeer> newMembers) {
+    final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
+    final long nextIndex = raftLog.getNextIndex();
+    for (RaftPeer peer : newMembers) {
+      FollowerInfo f = new FollowerInfo(peer, t, nextIndex, false);
+      LogAppender sender = server.getLogAppenderFactory()
+          .getLogAppender(server, this, f);
+      senders.add(sender);
+      sender.start();
+    }
+  }
+
+  /**
+   * Update the RpcSender list based on the current configuration
+   */
+  private void updateSenders(RaftConfiguration conf) {
+    Preconditions.checkState(conf.isStable() && !inStagingState());
+    Iterator<LogAppender> iterator = senders.iterator();
+    while (iterator.hasNext()) {
+      LogAppender sender = iterator.next();
+      if (!conf.containsInConf(sender.getFollower().getPeer().getId())) {
+        iterator.remove();
+        sender.stopSender();
+        sender.interrupt();
+      }
+    }
+  }
+
+  void submitUpdateStateEvent(StateUpdateEvent event) {
+    try {
+      eventQ.put(event);
+    } catch (InterruptedException e) {
+      LOG.info("Interrupted when adding event {} into the queue", event);
+    }
+  }
+
+  private void prepare() {
+    synchronized (server) {
+      if (running) {
+        final RaftConfiguration conf = server.getRaftConf();
+        if (conf.isTransitional() && server.getState().isConfCommitted()) {
+          // the configuration is in transitional state, and has been committed
+          // so it is time to generate and replicate (new) conf.
+          replicateNewConf();
+        }
+      }
+    }
+  }
+
+  /**
+   * The processor thread takes the responsibility to update the raft server's
+   * state, such as changing to follower, or updating the committed index.
+   */
+  private class EventProcessor extends Daemon {
+    @Override
+    public void run() {
+      // apply an empty message; check if necessary to replicate (new) conf
+      prepare();
+
+      while (running) {
+        try {
+          StateUpdateEvent event = eventQ.poll(server.getMaxTimeoutMs(),
+              TimeUnit.MILLISECONDS);
+          synchronized (server) {
+            if (running) {
+              handleEvent(event);
+            }
+          }
+          // the updated configuration does not need to be sync'ed here
+        } catch (InterruptedException e) {
+          final String s = server.getId() + " " + getClass().getSimpleName()
+              + " thread is interrupted ";
+          if (!running) {
+            LOG.info(s + " gracefully; server=" + server);
+          } else {
+            LOG.warn(s + " UNEXPECTEDLY; server=" + server, e);
+            throw new RuntimeException(e);
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed to persist new votedFor/term.", e);
+          // the failure should happen while changing the state to follower
+          // thus the in-memory state should have been updated
+          Preconditions.checkState(!running);
+        }
+      }
+    }
+  }
+
+  private void handleEvent(StateUpdateEvent e) throws IOException {
+    if (e == null) {
+      if (inStagingState()) {
+        checkNewPeers();
+      }
+    } else {
+      if (e.type == STEPDOWN) {
+        server.changeToFollower(e.newTerm, true);
+      } else if (e.type == UPDATECOMMIT) {
+        updateLastCommitted();
+      } else if (e.type == STAGINGPROGRESS) {
+        checkNewPeers();
+      }
+    }
+  }
+
+  /**
+   * So far we use a simple implementation for catchup checking:
+   * 1. If the latest rpc time of the remote peer is before 3 * max_timeout,
+   *    the peer made no progress for that long. We should fail the whole
+   *    setConfiguration request.
+   * 2. If the peer's matching index is just behind for a small gap, and the
+   *    peer was updated recently (within max_timeout), declare the peer as
+   *    caught-up.
+   * 3. Otherwise the peer is making progressing. Keep waiting.
+   */
+  private BootStrapProgress checkProgress(FollowerInfo follower,
+      long committed) {
+    Preconditions.checkArgument(!follower.isAttendingVote());
+    final Timestamp progressTime = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
+    final Timestamp timeoutTime = new Timestamp().addTimeMs(-3*server.getMaxTimeoutMs());
+    if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) {
+      LOG.debug("{} detects a follower {} timeout for bootstrapping," +
+              " timeoutTime: {}", server.getId(), follower, timeoutTime);
+      return BootStrapProgress.NOPROGRESS;
+    } else if (follower.getMatchIndex() + stagingCatchupGap > committed
+        && follower.getLastRpcResponseTime().compareTo(progressTime) > 0) {
+      return BootStrapProgress.CAUGHTUP;
+    } else {
+      return BootStrapProgress.PROGRESSING;
+    }
+  }
+
+  private Collection<BootStrapProgress> checkAllProgress(long committed) {
+    Preconditions.checkState(inStagingState());
+    return senders.stream()
+        .filter(sender -> !sender.getFollower().isAttendingVote())
+        .map(sender -> checkProgress(sender.getFollower(), committed))
+        .collect(Collectors.toCollection(ArrayList::new));
+  }
+
+  private void checkNewPeers() {
+    if (!inStagingState()) {
+      // it is possible that the bootstrapping is done and we still have
+      // remaining STAGINGPROGRESS event to handle.
+      updateLastCommitted();
+    } else {
+      final long committedIndex = server.getState().getLog()
+          .getLastCommittedIndex();
+      Collection<BootStrapProgress> reports = checkAllProgress(committedIndex);
+      if (reports.contains(BootStrapProgress.NOPROGRESS)) {
+        LOG.debug("{} fails the setConfiguration request", server.getId());
+        stagingState.fail();
+      } else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
+        // all caught up!
+        applyOldNewConf();
+        for (LogAppender sender : senders) {
+          sender.getFollower().startAttendVote();
+        }
+      }
+    }
+  }
+
+  boolean isBootStrappingPeer(String peerId) {
+    return inStagingState() && getStagingState().contains(peerId);
+  }
+
+  private void updateLastCommitted() {
+    final String selfId = server.getId();
+    final RaftConfiguration conf = server.getRaftConf();
+    long majorityInNewConf = computeLastCommitted(voterLists.get(0),
+        conf.containsInConf(selfId));
+    final long oldLastCommitted = raftLog.getLastCommittedIndex();
+    final LogEntryProto[] entriesToCommit;
+    if (!conf.isTransitional()) {
+      // copy the entries that may get committed out of the raftlog, to prevent
+      // the possible race that the log gets purged after the statemachine does
+      // a snapshot
+      entriesToCommit = raftLog.getEntries(oldLastCommitted + 1,
+          Math.max(majorityInNewConf, oldLastCommitted) + 1);
+      server.getState().updateStatemachine(majorityInNewConf, currentTerm);
+    } else { // configuration is in transitional state
+      long majorityInOldConf = computeLastCommitted(voterLists.get(1),
+          conf.containsInOldConf(selfId));
+      final long majority = Math.min(majorityInNewConf, majorityInOldConf);
+      entriesToCommit = raftLog.getEntries(oldLastCommitted + 1,
+          Math.max(majority, oldLastCommitted) + 1);
+      server.getState().updateStatemachine(majority, currentTerm);
+    }
+    checkAndUpdateConfiguration(entriesToCommit);
+  }
+
+  private boolean committedConf(LogEntryProto[] entries) {
+    final long currentCommitted = raftLog.getLastCommittedIndex();
+    for (LogEntryProto entry : entries) {
+      if (entry.getIndex() <= currentCommitted &&
+          ProtoUtils.isConfigurationLogEntry(entry)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private void checkAndUpdateConfiguration(LogEntryProto[] entriesToCheck) {
+    final RaftConfiguration conf = server.getRaftConf();
+    if (committedConf(entriesToCheck)) {
+      if (conf.isTransitional()) {
+        replicateNewConf();
+      } else { // the (new) log entry has been committed
+        LOG.debug("{} sends success to setConfiguration request", server.getId());
+        pendingRequests.replySetConfiguration();
+        // if the leader is not included in the current configuration, step down
+        if (!conf.containsInConf(server.getId())) {
+          LOG.info("{} is not included in the new configuration {}. Step down.",
+              server.getId(), conf);
+          try {
+            // leave some time for all RPC senders to send out new conf entry
+            Thread.sleep(server.getMinTimeoutMs());
+          } catch (InterruptedException ignored) {
+          }
+          // the pending request handler will send NotLeaderException for
+          // pending client requests when it stops
+          server.close();
+        }
+      }
+    }
+  }
+
+  /**
+   * when the (old, new) log entry has been committed, should replicate (new):
+   * 1) append (new) to log
+   * 2) update conf to (new)
+   * 3) update RpcSenders list
+   * 4) start replicating the log entry
+   */
+  private void replicateNewConf() {
+    final RaftConfiguration conf = server.getRaftConf();
+    final RaftConfiguration newConf = RaftConfiguration.newBuilder()
+        .setConf(conf)
+        .setLogEntryIndex(raftLog.getNextIndex())
+        .build();
+    // stop the LogAppender if the corresponding follower is no longer in the conf
+    updateSenders(newConf);
+    long index = raftLog.append(server.getState().getCurrentTerm(), newConf);
+    updateConfiguration(index, newConf);
+    notifySenders();
+  }
+
+  private long computeLastCommitted(List<FollowerInfo> followers,
+      boolean includeSelf) {
+    final int length = includeSelf ? followers.size() + 1 : followers.size();
+    final long[] indices = new long[length];
+    for (int i = 0; i < followers.size(); i++) {
+      indices[i] = followers.get(i).getMatchIndex();
+    }
+    if (includeSelf) {
+      // note that we also need to wait for the local disk I/O
+      indices[length - 1] = raftLog.getLatestFlushedIndex();
+    }
+
+    Arrays.sort(indices);
+    return indices[(indices.length - 1) / 2];
+  }
+
+  private List<List<FollowerInfo>> divideFollowers(RaftConfiguration conf) {
+    List<List<FollowerInfo>> lists = new ArrayList<>(2);
+    List<FollowerInfo> listForNew = senders.stream()
+        .filter(sender -> conf.containsInConf(sender.getFollower().getPeer().getId()))
+        .map(LogAppender::getFollower)
+        .collect(Collectors.toList());
+    lists.add(listForNew);
+    if (conf.isTransitional()) {
+      List<FollowerInfo> listForOld = senders.stream()
+          .filter(sender -> conf.containsInOldConf(sender.getFollower().getPeer().getId()))
+          .map(LogAppender::getFollower)
+          .collect(Collectors.toList());
+      lists.add(listForOld);
+    }
+    return lists;
+  }
+
+  PendingRequest returnNoConfChange(SetConfigurationRequest r) {
+    PendingRequest pending = new PendingRequest(r);
+    pending.setSuccessReply(null);
+    return pending;
+  }
+
+  void replyPendingRequest(long logIndex, CompletableFuture<Message> message) {
+    pendingRequests.replyPendingRequest(logIndex, message);
+  }
+
+  TransactionContext getTransactionContext(long index) {
+    return pendingRequests.getTransactionContext(index);
+  }
+
+  private class ConfigurationStagingState {
+    private final Map<String, RaftPeer> newPeers;
+    private final PeerConfiguration newConf;
+
+    ConfigurationStagingState(Collection<RaftPeer> newPeers,
+        PeerConfiguration newConf) {
+      Map<String, RaftPeer> map = new HashMap<>();
+      for (RaftPeer peer : newPeers) {
+        map.put(peer.getId(), peer);
+      }
+      this.newPeers = Collections.unmodifiableMap(map);
+      this.newConf = newConf;
+    }
+
+    RaftConfiguration generateOldNewConf(RaftConfiguration current,
+        long logIndex) {
+      return RaftConfiguration.newBuilder()
+          .setConf(newConf)
+          .setOldConf(current)
+          .setLogEntryIndex(logIndex)
+          .build();
+    }
+
+    Collection<RaftPeer> getNewPeers() {
+      return newPeers.values();
+    }
+
+    boolean contains(String peerId) {
+      return newPeers.containsKey(peerId);
+    }
+
+    void fail() {
+      Iterator<LogAppender> iterator = senders.iterator();
+      while (iterator.hasNext()) {
+        LogAppender sender = iterator.next();
+        if (!sender.getFollower().isAttendingVote()) {
+          iterator.remove();
+          sender.stopSender();
+          sender.interrupt();
+        }
+      }
+      LeaderState.this.stagingState = null;
+      // send back failure response to client's request
+      pendingRequests.failSetConfiguration(
+          new ReconfigurationTimeoutException("Fail to set configuration "
+              + newConf + ". Timeout when bootstrapping new peers."));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
new file mode 100644
index 0000000..5599699
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -0,0 +1,494 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_DEFAULT;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_KEY;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_DEFAULT;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_KEY;
+import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.ratis.server.impl.LeaderState.StateUpdateEventType;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.Timestamp;
+import org.slf4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * A daemon thread appending log entries to a follower peer.
+ */
+public class LogAppender extends Daemon {
+  public static final Logger LOG = RaftServerImpl.LOG;
+
+  protected final RaftServerImpl server;
+  private final LeaderState leaderState;
+  protected final RaftLog raftLog;
+  protected final FollowerInfo follower;
+  private final int maxBufferSize;
+  private final boolean batchSending;
+  private final LogEntryBuffer buffer;
+  private final long leaderTerm;
+
+  private volatile boolean sending = true;
+
+  public LogAppender(RaftServerImpl server, LeaderState leaderState, FollowerInfo f) {
+    this.follower = f;
+    this.server = server;
+    this.leaderState = leaderState;
+    this.raftLog = server.getState().getLog();
+    this.maxBufferSize = server.getProperties().getInt(
+        RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_KEY,
+        RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_DEFAULT);
+    this.batchSending = server.getProperties().getBoolean(
+        RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_KEY,
+        RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_DEFAULT);
+    this.buffer = new LogEntryBuffer();
+    this.leaderTerm = server.getState().getCurrentTerm();
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + "(" + server.getId() + " -> " +
+        follower.getPeer().getId() + ")";
+  }
+
+  @Override
+  public void run() {
+    try {
+      checkAndSendAppendEntries();
+    } catch (InterruptedException | InterruptedIOException e) {
+      LOG.info(this + " was interrupted: " + e);
+    }
+  }
+
+  protected boolean isAppenderRunning() {
+    return sending;
+  }
+
+  public void stopSender() {
+    this.sending = false;
+  }
+
+  public FollowerInfo getFollower() {
+    return follower;
+  }
+
+  /**
+   * A buffer for log entries with size limitation.
+   */
+  private class LogEntryBuffer {
+    private final List<LogEntryProto> buf = new ArrayList<>();
+    private int totalSize = 0;
+
+    void addEntry(LogEntryProto entry) {
+      buf.add(entry);
+      totalSize += entry.getSerializedSize();
+    }
+
+    boolean isFull() {
+      return totalSize >= maxBufferSize;
+    }
+
+    boolean isEmpty() {
+      return buf.isEmpty();
+    }
+
+    AppendEntriesRequestProto getAppendRequest(TermIndex previous) {
+      final AppendEntriesRequestProto request = server
+          .createAppendEntriesRequest(leaderTerm, follower.getPeer().getId(),
+              previous, buf, !follower.isAttendingVote());
+      buf.clear();
+      totalSize = 0;
+      return request;
+    }
+
+    int getPendingEntryNum() {
+      return buf.size();
+    }
+  }
+
+  private TermIndex getPrevious() {
+    TermIndex previous = ServerProtoUtils.toTermIndex(
+        raftLog.get(follower.getNextIndex() - 1));
+    if (previous == null) {
+      // if previous is null, nextIndex must be equal to the log start
+      // index (otherwise we will install snapshot).
+      Preconditions.checkState(follower.getNextIndex() == raftLog.getStartIndex(),
+          "follower's next index %s, local log start index %s",
+          follower.getNextIndex(), raftLog.getStartIndex());
+      SnapshotInfo snapshot = server.getState().getLatestSnapshot();
+      previous = snapshot == null ? null : snapshot.getTermIndex();
+    }
+    return previous;
+  }
+
+  protected AppendEntriesRequestProto createRequest() {
+    final TermIndex previous = getPrevious();
+    final long leaderNext = raftLog.getNextIndex();
+    long next = follower.getNextIndex() + buffer.getPendingEntryNum();
+    boolean toSend = false;
+
+    if (leaderNext == next && !buffer.isEmpty()) {
+      // no new entries, then send out the entries in the buffer
+      toSend = true;
+    } else if (leaderNext > next) {
+      while (leaderNext > next && !buffer.isFull()) {
+        // stop adding entry once the buffer size is >= the max size
+        buffer.addEntry(raftLog.get(next++));
+      }
+      if (buffer.isFull() || !batchSending) {
+        // buffer is full or batch sending is disabled, send out a request
+        toSend = true;
+      }
+    }
+
+    if (toSend || shouldHeartbeat()) {
+      return buffer.getAppendRequest(previous);
+    }
+    return null;
+  }
+
+  /** Send an appendEntries RPC; retry indefinitely. */
+  private AppendEntriesReplyProto sendAppendEntriesWithRetries()
+      throws InterruptedException, InterruptedIOException {
+    int retry = 0;
+    AppendEntriesRequestProto request = null;
+    while (isAppenderRunning()) { // keep retrying for IOException
+      try {
+        if (request == null || request.getEntriesCount() == 0) {
+          request = createRequest();
+        }
+
+        if (request == null) {
+          LOG.trace("{} need not send AppendEntries now." +
+              " Wait for more entries.", server.getId());
+          return null;
+        } else if (!isAppenderRunning()) {
+          LOG.debug("LogAppender {} has been stopped. Skip the request.", this);
+          return null;
+        }
+
+        follower.updateLastRpcSendTime();
+        final AppendEntriesReplyProto r = server.getServerRpc()
+            .appendEntries(request);
+        follower.updateLastRpcResponseTime();
+
+        return r;
+      } catch (InterruptedIOException iioe) {
+        throw iioe;
+      } catch (IOException ioe) {
+        LOG.debug(this + ": failed to send appendEntries; retry " + retry++, ioe);
+      }
+      if (isAppenderRunning()) {
+        Thread.sleep(leaderState.getSyncInterval());
+      }
+    }
+    return null;
+  }
+
+  protected class SnapshotRequestIter
+      implements Iterable<InstallSnapshotRequestProto> {
+    private final SnapshotInfo snapshot;
+    private final List<FileInfo> files;
+    private FileInputStream in;
+    private int fileIndex = 0;
+
+    private FileInfo currentFileInfo;
+    private byte[] currentBuf;
+    private long currentFileSize;
+    private long currentOffset = 0;
+    private int chunkIndex = 0;
+
+    private final String requestId;
+    private int requestIndex = 0;
+
+    public SnapshotRequestIter(SnapshotInfo snapshot, String requestId)
+        throws IOException {
+      this.snapshot = snapshot;
+      this.requestId = requestId;
+      this.files = snapshot.getFiles();
+      if (files.size() > 0) {
+        startReadFile();
+      }
+    }
+
+    private void startReadFile() throws IOException {
+      currentFileInfo = files.get(fileIndex);
+      File snapshotFile = currentFileInfo.getPath().toFile();
+      currentFileSize = snapshotFile.length();
+      final int bufLength =
+          (int) Math.min(leaderState.getSnapshotChunkMaxSize(), currentFileSize);
+      currentBuf = new byte[bufLength];
+      currentOffset = 0;
+      chunkIndex = 0;
+      in = new FileInputStream(snapshotFile);
+    }
+
+    @Override
+    public Iterator<InstallSnapshotRequestProto> iterator() {
+      return new Iterator<InstallSnapshotRequestProto>() {
+        @Override
+        public boolean hasNext() {
+          return fileIndex < files.size();
+        }
+
+        @Override
+        public InstallSnapshotRequestProto next() {
+          if (fileIndex >= files.size()) {
+            throw new NoSuchElementException();
+          }
+          int targetLength = (int) Math.min(currentFileSize - currentOffset,
+              leaderState.getSnapshotChunkMaxSize());
+          FileChunkProto chunk;
+          try {
+            chunk = readFileChunk(currentFileInfo, in, currentBuf,
+                targetLength, currentOffset, chunkIndex);
+            boolean done = (fileIndex == files.size() - 1) &&
+                chunk.getDone();
+            InstallSnapshotRequestProto request =
+                server.createInstallSnapshotRequest(follower.getPeer().getId(),
+                    requestId, requestIndex++, snapshot,
+                    Lists.newArrayList(chunk), done);
+            currentOffset += targetLength;
+            chunkIndex++;
+
+            if (currentOffset >= currentFileSize) {
+              in.close();
+              fileIndex++;
+              if (fileIndex < files.size()) {
+                startReadFile();
+              }
+            }
+
+            return request;
+          } catch (IOException e) {
+            if (in != null) {
+              try {
+                in.close();
+              } catch (IOException ignored) {
+              }
+            }
+            LOG.warn("Got exception when preparing InstallSnapshot request", e);
+            throw new RuntimeException(e);
+          }
+        }
+      };
+    }
+  }
+
+  private FileChunkProto readFileChunk(FileInfo fileInfo,
+      FileInputStream in, byte[] buf, int length, long offset, int chunkIndex)
+      throws IOException {
+    FileChunkProto.Builder builder = FileChunkProto.newBuilder()
+        .setOffset(offset).setChunkIndex(chunkIndex);
+    IOUtils.readFully(in, buf, 0, length);
+    Path relativePath = server.getState().getStorage().getStorageDir()
+        .relativizeToRoot(fileInfo.getPath());
+    builder.setFilename(relativePath.toString());
+    builder.setDone(offset + length == fileInfo.getFileSize());
+    builder.setFileDigest(
+        ByteString.copyFrom(fileInfo.getFileDigest().getDigest()));
+    builder.setData(ByteString.copyFrom(buf, 0, length));
+    return builder.build();
+  }
+
+  private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot)
+      throws InterruptedException, InterruptedIOException {
+    String requestId = UUID.randomUUID().toString();
+    InstallSnapshotReplyProto reply = null;
+    try {
+      for (InstallSnapshotRequestProto request :
+          new SnapshotRequestIter(snapshot, requestId)) {
+        follower.updateLastRpcSendTime();
+        reply = server.getServerRpc().installSnapshot(request);
+        follower.updateLastRpcResponseTime();
+
+        if (!reply.getServerReply().getSuccess()) {
+          return reply;
+        }
+      }
+    } catch (InterruptedIOException iioe) {
+      throw iioe;
+    } catch (Exception ioe) {
+      LOG.warn(this + ": failed to install SnapshotInfo " + snapshot.getFiles(),
+          ioe);
+      return null;
+    }
+
+    if (reply != null) {
+      follower.updateMatchIndex(snapshot.getTermIndex().getIndex());
+      follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1);
+      LOG.info("{}: install snapshot-{} successfully on follower {}",
+          server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer());
+    }
+    return reply;
+  }
+
+  protected SnapshotInfo shouldInstallSnapshot() {
+    final long logStartIndex = raftLog.getStartIndex();
+    // we should install snapshot if the follower needs to catch up and:
+    // 1. there is no local log entry but there is snapshot
+    // 2. or the follower's next index is smaller than the log start index
+    if (follower.getNextIndex() < raftLog.getNextIndex()) {
+      SnapshotInfo snapshot = server.getState().getLatestSnapshot();
+      if (follower.getNextIndex() < logStartIndex ||
+          (logStartIndex == INVALID_LOG_INDEX && snapshot != null)) {
+        return snapshot;
+      }
+    }
+    return null;
+  }
+
+  /** Check and send appendEntries RPC */
+  private void checkAndSendAppendEntries()
+      throws InterruptedException, InterruptedIOException {
+    while (isAppenderRunning()) {
+      if (shouldSendRequest()) {
+        SnapshotInfo snapshot = shouldInstallSnapshot();
+        if (snapshot != null) {
+          LOG.info("{}: follower {}'s next index is {}," +
+              " log's start index is {}, need to install snapshot",
+              server.getId(), follower.getPeer(), follower.getNextIndex(),
+              raftLog.getStartIndex());
+
+          final InstallSnapshotReplyProto r = installSnapshot(snapshot);
+          if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
+            checkResponseTerm(r.getTerm());
+          } // otherwise if r is null, retry the snapshot installation
+        } else {
+          final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
+          if (r != null) {
+            handleReply(r);
+          }
+        }
+      }
+      if (isAppenderRunning() && !shouldAppendEntries(
+          follower.getNextIndex() + buffer.getPendingEntryNum())) {
+        final long waitTime = getHeartbeatRemainingTime(
+            follower.getLastRpcTime());
+        if (waitTime > 0) {
+          synchronized (this) {
+            wait(waitTime);
+          }
+        }
+      }
+    }
+  }
+
+  private void handleReply(AppendEntriesReplyProto reply) {
+    if (reply != null) {
+      switch (reply.getResult()) {
+        case SUCCESS:
+          final long oldNextIndex = follower.getNextIndex();
+          final long nextIndex = reply.getNextIndex();
+          if (nextIndex < oldNextIndex) {
+            throw new IllegalStateException("nextIndex=" + nextIndex
+                + " < oldNextIndex=" + oldNextIndex
+                + ", reply=" + ProtoUtils.toString(reply));
+          }
+
+          if (nextIndex > oldNextIndex) {
+            follower.updateMatchIndex(nextIndex - 1);
+            follower.updateNextIndex(nextIndex);
+            submitEventOnSuccessAppend();
+          }
+          break;
+        case NOT_LEADER:
+          // check if should step down
+          checkResponseTerm(reply.getTerm());
+          break;
+        case INCONSISTENCY:
+          follower.decreaseNextIndex(reply.getNextIndex());
+          break;
+        case UNRECOGNIZED:
+          LOG.warn("{} received UNRECOGNIZED AppendResult from {}",
+              server.getId(), follower.getPeer().getId());
+          break;
+      }
+    }
+  }
+
+  protected void submitEventOnSuccessAppend() {
+    LeaderState.StateUpdateEvent e = follower.isAttendingVote() ?
+        LeaderState.UPDATE_COMMIT_EVENT :
+        LeaderState.STAGING_PROGRESS_EVENT;
+    leaderState.submitUpdateStateEvent(e);
+  }
+
+  public synchronized void notifyAppend() {
+    this.notify();
+  }
+
+  /** Should the leader send appendEntries RPC to this follower? */
+  protected boolean shouldSendRequest() {
+    return shouldAppendEntries(follower.getNextIndex()) || shouldHeartbeat();
+  }
+
+  private boolean shouldAppendEntries(long followerIndex) {
+    return followerIndex < raftLog.getNextIndex();
+  }
+
+  private boolean shouldHeartbeat() {
+    return getHeartbeatRemainingTime(follower.getLastRpcTime()) <= 0;
+  }
+
+  /**
+   * @return the time in milliseconds that the leader should send a heartbeat.
+   */
+  protected long getHeartbeatRemainingTime(Timestamp lastTime) {
+    return server.getMinTimeoutMs() / 2 - lastTime.elapsedTimeMs();
+  }
+
+  protected void checkResponseTerm(long responseTerm) {
+    synchronized (server) {
+      if (isAppenderRunning() && follower.isAttendingVote()
+          && responseTerm > leaderState.getCurrentTerm()) {
+        leaderState.submitUpdateStateEvent(
+            new LeaderState.StateUpdateEvent(StateUpdateEventType.STEPDOWN,
+                responseTerm));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java
new file mode 100644
index 0000000..e6cc213
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+public interface LogAppenderFactory {
+  LogAppender getLogAppender(RaftServerImpl server, LeaderState state,
+                             FollowerInfo f);
+
+  class SynchronousLogAppenderFactory implements LogAppenderFactory {
+    @Override
+    public LogAppender getLogAppender(RaftServerImpl server, LeaderState state,
+                                      FollowerInfo f) {
+      return new LogAppender(server, state, f);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
new file mode 100644
index 0000000..b532303
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import com.google.common.base.Preconditions;
+
+import java.util.*;
+
+import org.apache.ratis.protocol.RaftPeer;
+
+/**
+ * The peer configuration of a raft cluster.
+ *
+ * The objects of this class are immutable.
+ */
+class PeerConfiguration {
+  private final Map<String, RaftPeer> peers;
+
+  PeerConfiguration(Iterable<RaftPeer> peers) {
+    Preconditions.checkNotNull(peers);
+    Map<String, RaftPeer> map = new HashMap<>();
+    for(RaftPeer p : peers) {
+      map.put(p.getId(), p);
+    }
+    this.peers = Collections.unmodifiableMap(map);
+    Preconditions.checkState(!this.peers.isEmpty());
+  }
+
+  Collection<RaftPeer> getPeers() {
+    return Collections.unmodifiableCollection(peers.values());
+  }
+
+  int size() {
+    return peers.size();
+  }
+
+  @Override
+  public String toString() {
+    return peers.values().toString();
+  }
+
+  RaftPeer getPeer(String id) {
+    return peers.get(id);
+  }
+
+  boolean contains(String id) {
+    return peers.containsKey(id);
+  }
+
+  List<RaftPeer> getOtherPeers(String selfId) {
+    List<RaftPeer> others = new ArrayList<>();
+    for (Map.Entry<String, RaftPeer> entry : peers.entrySet()) {
+      if (!selfId.equals(entry.getValue().getId())) {
+        others.add(entry.getValue());
+      }
+    }
+    return others;
+  }
+
+  boolean hasMajority(Collection<String> others, String selfId) {
+    Preconditions.checkArgument(!others.contains(selfId));
+    int num = 0;
+    if (contains(selfId)) {
+      num++;
+    }
+    for (String other : others) {
+      if (contains(other)) {
+        num++;
+      }
+      if (num > size() / 2) {
+        return true;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
new file mode 100644
index 0000000..bf47cdc
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.statemachine.TransactionContext;
+
+import java.util.concurrent.CompletableFuture;
+
+public class PendingRequest implements Comparable<PendingRequest> {
+  private final Long index;
+  private final RaftClientRequest request;
+  private final TransactionContext entry;
+  private final CompletableFuture<RaftClientReply> future;
+
+  PendingRequest(long index, RaftClientRequest request,
+                 TransactionContext entry) {
+    this.index = index;
+    this.request = request;
+    this.entry = entry;
+    this.future = new CompletableFuture<>();
+  }
+
+  PendingRequest(SetConfigurationRequest request) {
+    this(RaftServerConstants.INVALID_LOG_INDEX, request, null);
+  }
+
+  long getIndex() {
+    return index;
+  }
+
+  RaftClientRequest getRequest() {
+    return request;
+  }
+
+  public CompletableFuture<RaftClientReply> getFuture() {
+    return future;
+  }
+
+  TransactionContext getEntry() {
+    return entry;
+  }
+
+  synchronized void setException(Throwable e) {
+    Preconditions.checkArgument(e != null);
+    future.completeExceptionally(e);
+  }
+
+  synchronized void setReply(RaftClientReply r) {
+    Preconditions.checkArgument(r != null);
+    future.complete(r);
+  }
+
+  void setSuccessReply(Message message) {
+    setReply(new RaftClientReply(getRequest(), message));
+  }
+
+  @Override
+  public int compareTo(PendingRequest that) {
+    return Long.compare(this.index, that.index);
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + "(index=" + index
+        + ", request=" + request;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
new file mode 100644
index 0000000..6343344
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+class PendingRequests {
+  private static final Logger LOG = RaftServerImpl.LOG;
+
+  private PendingRequest pendingSetConf;
+  private final RaftServerImpl server;
+  private final ConcurrentMap<Long, PendingRequest> pendingRequests = new ConcurrentHashMap<>();
+  private PendingRequest last = null;
+
+  PendingRequests(RaftServerImpl server) {
+    this.server = server;
+  }
+
+  PendingRequest addPendingRequest(long index, RaftClientRequest request,
+      TransactionContext entry) {
+    // externally synced for now
+    Preconditions.checkArgument(!request.isReadOnly());
+    Preconditions.checkState(last == null || index == last.getIndex() + 1);
+    return add(index, request, entry);
+  }
+
+  private PendingRequest add(long index, RaftClientRequest request,
+      TransactionContext entry) {
+    final PendingRequest pending = new PendingRequest(index, request, entry);
+    pendingRequests.put(index, pending);
+    last = pending;
+    return pending;
+  }
+
+  PendingRequest addConfRequest(SetConfigurationRequest request) {
+    Preconditions.checkState(pendingSetConf == null);
+    pendingSetConf = new PendingRequest(request);
+    return pendingSetConf;
+  }
+
+  void replySetConfiguration() {
+    // we allow the pendingRequest to be null in case that the new leader
+    // commits the new configuration while it has not received the retry
+    // request from the client
+    if (pendingSetConf != null) {
+      // for setConfiguration we do not need to wait for statemachine. send back
+      // reply after it's committed.
+      pendingSetConf.setSuccessReply(null);
+      pendingSetConf = null;
+    }
+  }
+
+  void failSetConfiguration(RaftException e) {
+    Preconditions.checkState(pendingSetConf != null);
+    pendingSetConf.setException(e);
+    pendingSetConf = null;
+  }
+
+  TransactionContext getTransactionContext(long index) {
+    PendingRequest pendingRequest = pendingRequests.get(index);
+    // it is possible that the pendingRequest is null if this peer just becomes
+    // the new leader and commits transactions received by the previous leader
+    return pendingRequest != null ? pendingRequest.getEntry() : null;
+  }
+
+  void replyPendingRequest(long index, CompletableFuture<Message> messageFuture) {
+    final PendingRequest pending = pendingRequests.get(index);
+    if (pending != null) {
+      Preconditions.checkState(pending.getIndex() == index);
+
+      messageFuture.whenComplete((reply, exception) -> {
+        if (exception == null) {
+          pending.setSuccessReply(reply);
+        } else {
+          pending.setException(exception);
+        }
+      });
+    }
+  }
+
+  /**
+   * The leader state is stopped. Send NotLeaderException to all the pending
+   * requests since they have not got applied to the state machine yet.
+   */
+  void sendNotLeaderResponses() throws IOException {
+    LOG.info("{} sends responses before shutting down PendingRequestsHandler",
+        server.getId());
+
+    Collection<TransactionContext> pendingEntries = pendingRequests.values().stream()
+        .map(PendingRequest::getEntry).collect(Collectors.toList());
+    // notify the state machine about stepping down
+    server.getStateMachine().notifyNotLeader(pendingEntries);
+    pendingRequests.values().forEach(this::setNotLeaderException);
+    if (pendingSetConf != null) {
+      setNotLeaderException(pendingSetConf);
+    }
+  }
+
+  private void setNotLeaderException(PendingRequest pending) {
+    RaftClientReply reply = new RaftClientReply(pending.getRequest(),
+        server.generateNotLeaderException());
+    pending.setReply(reply);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
new file mode 100644
index 0000000..8fdd628
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.ratis.protocol.RaftPeer;
+
+/**
+ * The configuration of the raft cluster.
+ *
+ * The configuration is stable if there is no on-going peer change. Otherwise,
+ * the configuration is transitional, i.e. in the middle of a peer change.
+ *
+ * The objects of this class are immutable.
+ */
+public class RaftConfiguration {
+  /** Create a {@link Builder}. */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /** To build {@link RaftConfiguration} objects. */
+  public static class Builder {
+    private PeerConfiguration oldConf;
+    private PeerConfiguration conf;
+    private long logEntryIndex = RaftServerConstants.INVALID_LOG_INDEX;
+
+    private boolean forceStable = false;
+    private boolean forceTransitional = false;
+
+    private Builder() {}
+
+    public Builder setConf(PeerConfiguration conf) {
+      Preconditions.checkNotNull(conf);
+      Preconditions.checkState(this.conf == null, "conf is already set.");
+      this.conf = conf;
+      return this;
+    }
+
+    public Builder setConf(Iterable<RaftPeer> peers) {
+      return setConf(new PeerConfiguration(peers));
+    }
+
+    public Builder setConf(RaftPeer[] peers) {
+      return setConf(Arrays.asList(peers));
+    }
+
+    Builder setConf(RaftConfiguration transitionalConf) {
+      Preconditions.checkNotNull(transitionalConf);
+      Preconditions.checkState(transitionalConf.isTransitional());
+
+      Preconditions.checkState(!forceTransitional);
+      forceStable = true;
+      return setConf(transitionalConf.conf);
+    }
+
+
+    public Builder setOldConf(PeerConfiguration oldConf) {
+      Preconditions.checkNotNull(oldConf);
+      Preconditions.checkState(this.oldConf == null, "oldConf is already set.");
+      this.oldConf = oldConf;
+      return this;
+    }
+
+    public Builder setOldConf(Iterable<RaftPeer> oldPeers) {
+      return setOldConf(new PeerConfiguration(oldPeers));
+    }
+
+    public Builder setOldConf(RaftPeer[] oldPeers) {
+      return setOldConf(Arrays.asList(oldPeers));
+    }
+
+    Builder setOldConf(RaftConfiguration stableConf) {
+      Preconditions.checkNotNull(stableConf);
+      Preconditions.checkState(stableConf.isStable());
+
+      Preconditions.checkState(!forceStable);
+      forceTransitional = true;
+      return setOldConf(stableConf.conf);
+    }
+
+    public Builder setLogEntryIndex(long logEntryIndex) {
+      Preconditions.checkArgument(
+          logEntryIndex != RaftServerConstants.INVALID_LOG_INDEX);
+      Preconditions.checkState(
+          this.logEntryIndex == RaftServerConstants.INVALID_LOG_INDEX,
+          "logEntryIndex is already set.");
+      this.logEntryIndex = logEntryIndex;
+      return this;
+    }
+
+    /** Build a {@link RaftConfiguration}. */
+    public RaftConfiguration build() {
+      if (forceTransitional) {
+        Preconditions.checkState(oldConf != null);
+      }
+      if (forceStable) {
+        Preconditions.checkState(oldConf == null);
+      }
+      return new RaftConfiguration(conf, oldConf, logEntryIndex);
+    }
+  }
+
+  /** Non-null only if this configuration is transitional. */
+  private final PeerConfiguration oldConf;
+  /**
+   * The current peer configuration while this configuration is stable;
+   * or the new peer configuration while this configuration is transitional.
+   */
+  private final PeerConfiguration conf;
+
+  /** The index of the corresponding log entry for this configuration. */
+  private final long logEntryIndex;
+
+  private RaftConfiguration(PeerConfiguration conf, PeerConfiguration oldConf,
+      long logEntryIndex) {
+    Preconditions.checkNotNull(conf);
+    this.conf = conf;
+    this.oldConf = oldConf;
+    this.logEntryIndex = logEntryIndex;
+  }
+
+  /** Is this configuration transitional, i.e. in the middle of a peer change? */
+  boolean isTransitional() {
+    return oldConf != null;
+  }
+
+  /** Is this configuration stable, i.e. no on-going peer change? */
+  boolean isStable() {
+    return oldConf == null;
+  }
+
+  boolean containsInConf(String peerId) {
+    return conf.contains(peerId);
+  }
+
+  boolean containsInOldConf(String peerId) {
+    return oldConf != null && oldConf.contains(peerId);
+  }
+
+  boolean contains(String peerId) {
+    return containsInConf(peerId) && (oldConf == null || containsInOldConf(peerId));
+  }
+
+  /**
+   * @return the peer corresponding to the given id;
+   *         or return null if the peer is not in this configuration.
+   */
+  public RaftPeer getPeer(String id) {
+    if (id == null) {
+      return null;
+    }
+    RaftPeer peer = conf.getPeer(id);
+    if (peer != null) {
+      return peer;
+    } else if (oldConf != null) {
+      return oldConf.getPeer(id);
+    }
+    return null;
+  }
+
+  /** @return all the peers from the conf, and the old conf if it exists. */
+  public Collection<RaftPeer> getPeers() {
+    final Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers());
+    if (oldConf != null) {
+      oldConf.getPeers().stream().filter(p -> !peers.contains(p))
+          .forEach(peers::add);
+    }
+    return peers;
+  }
+
+  /**
+   * @return all the peers other than the given self id from the conf,
+   *         and the old conf if it exists.
+   */
+  public Collection<RaftPeer> getOtherPeers(String selfId) {
+    Collection<RaftPeer> others = conf.getOtherPeers(selfId);
+    if (oldConf != null) {
+      oldConf.getOtherPeers(selfId).stream()
+          .filter(p -> !others.contains(p))
+          .forEach(others::add);
+    }
+    return others;
+  }
+
+  /** @return true if the self id together with the others are in the majority. */
+  boolean hasMajority(Collection<String> others, String selfId) {
+    Preconditions.checkArgument(!others.contains(selfId));
+    return conf.hasMajority(others, selfId) &&
+        (oldConf == null || oldConf.hasMajority(others, selfId));
+  }
+
+  @Override
+  public String toString() {
+    return conf + (oldConf != null ? "old:" + oldConf : "");
+  }
+
+  @VisibleForTesting
+  boolean hasNoChange(RaftPeer[] newMembers) {
+    if (!isStable() || conf.size() != newMembers.length) {
+      return false;
+    }
+    for (RaftPeer peer : newMembers) {
+      if (!conf.contains(peer.getId())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  long getLogEntryIndex() {
+    return logEntryIndex;
+  }
+
+  static Collection<RaftPeer> computeNewPeers(RaftPeer[] newMembers,
+      RaftConfiguration old) {
+    List<RaftPeer> peers = new ArrayList<>();
+    for (RaftPeer p : newMembers) {
+      if (!old.containsInConf(p.getId())) {
+        peers.add(p);
+      }
+    }
+    return peers;
+  }
+
+  RaftPeer getRandomPeer(String exclusiveId) {
+    final List<RaftPeer> peers = conf.getOtherPeers(exclusiveId);
+    if (peers.isEmpty()) {
+      return null;
+    }
+    final int index = ThreadLocalRandom.current().nextInt(peers.size());
+    return peers.get(index);
+  }
+
+  Collection<RaftPeer> getPeersInOldConf() {
+    return oldConf != null ? oldConf.getPeers() : Collections.emptyList();
+  }
+
+  Collection<RaftPeer> getPeersInConf() {
+    return conf.getPeers();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
new file mode 100644
index 0000000..caf9c4d
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.client.RaftClient;
+
+public interface RaftServerConstants {
+  long INVALID_LOG_INDEX = -1;
+  byte LOG_TERMINATE_BYTE = 0;
+  long DEFAULT_SEQNUM = RaftClient.DEFAULT_SEQNUM;
+
+  enum StartupOption {
+    FORMAT("format"),
+    REGULAR("regular");
+
+    private final String option;
+
+    StartupOption(String arg) {
+      this.option = arg;
+    }
+
+    public static StartupOption getOption(String arg) {
+      for (StartupOption s : StartupOption.values()) {
+        if (s.option.equals(arg)) {
+          return s;
+        }
+      }
+      return REGULAR;
+    }
+  }
+}