You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/01/31 21:16:49 UTC

[04/54] [abbrv] incubator-ratis git commit: Move o.a.r.s.* to o.a.r.s.impl.

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/LogAppender.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/LogAppender.java b/raft-server/src/main/java/org/apache/raft/server/LogAppender.java
deleted file mode 100644
index 1d2983b..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/LogAppender.java
+++ /dev/null
@@ -1,481 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.raft.server.LeaderState.StateUpdateEventType;
-import org.apache.raft.server.impl.ServerProtoUtils;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.FileInfo;
-import org.apache.raft.server.storage.RaftLog;
-import org.apache.raft.shaded.com.google.protobuf.ByteString;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.util.Daemon;
-import org.apache.raft.util.ProtoUtils;
-import org.apache.raft.util.Timestamp;
-import org.slf4j.Logger;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.file.Path;
-import java.util.*;
-
-import static org.apache.raft.server.RaftServerConfigKeys.*;
-import static org.apache.raft.server.RaftServerConstants.INVALID_LOG_INDEX;
-
-/**
- * A daemon thread appending log entries to a follower peer.
- */
-public class LogAppender extends Daemon {
-  public static final Logger LOG = RaftServer.LOG;
-
-  protected final RaftServer server;
-  private final LeaderState leaderState;
-  protected final RaftLog raftLog;
-  protected final FollowerInfo follower;
-  private final int maxBufferSize;
-  private final boolean batchSending;
-  private final LogEntryBuffer buffer;
-  private final long leaderTerm;
-
-  private volatile boolean sending = true;
-
-  public LogAppender(RaftServer server, LeaderState leaderState, FollowerInfo f) {
-    this.follower = f;
-    this.server = server;
-    this.leaderState = leaderState;
-    this.raftLog = server.getState().getLog();
-    this.maxBufferSize = server.getProperties().getInt(
-        RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_KEY,
-        RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_DEFAULT);
-    this.batchSending = server.getProperties().getBoolean(
-        RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_KEY,
-        RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_DEFAULT);
-    this.buffer = new LogEntryBuffer();
-    this.leaderTerm = server.getState().getCurrentTerm();
-  }
-
-  @Override
-  public String toString() {
-    return getClass().getSimpleName() + "(" + server.getId() + " -> " +
-        follower.getPeer().getId() + ")";
-  }
-
-  @Override
-  public void run() {
-    try {
-      checkAndSendAppendEntries();
-    } catch (InterruptedException | InterruptedIOException e) {
-      LOG.info(this + " was interrupted: " + e);
-    }
-  }
-
-  protected boolean isAppenderRunning() {
-    return sending;
-  }
-
-  public void stopSender() {
-    this.sending = false;
-  }
-
-  public FollowerInfo getFollower() {
-    return follower;
-  }
-
-  /**
-   * A buffer for log entries with size limitation.
-   */
-  private class LogEntryBuffer {
-    private final List<LogEntryProto> buf = new ArrayList<>();
-    private int totalSize = 0;
-
-    void addEntry(LogEntryProto entry) {
-      buf.add(entry);
-      totalSize += entry.getSerializedSize();
-    }
-
-    boolean isFull() {
-      return totalSize >= maxBufferSize;
-    }
-
-    boolean isEmpty() {
-      return buf.isEmpty();
-    }
-
-    AppendEntriesRequestProto getAppendRequest(TermIndex previous) {
-      final AppendEntriesRequestProto request = server
-          .createAppendEntriesRequest(leaderTerm, follower.getPeer().getId(),
-              previous, buf, !follower.isAttendingVote());
-      buf.clear();
-      totalSize = 0;
-      return request;
-    }
-
-    int getPendingEntryNum() {
-      return buf.size();
-    }
-  }
-
-  private TermIndex getPrevious() {
-    TermIndex previous = ServerProtoUtils.toTermIndex(
-        raftLog.get(follower.getNextIndex() - 1));
-    if (previous == null) {
-      // if previous is null, nextIndex must be equal to the log start
-      // index (otherwise we will install snapshot).
-      Preconditions.checkState(follower.getNextIndex() == raftLog.getStartIndex(),
-          "follower's next index %s, local log start index %s",
-          follower.getNextIndex(), raftLog.getStartIndex());
-      SnapshotInfo snapshot = server.getState().getLatestSnapshot();
-      previous = snapshot == null ? null : snapshot.getTermIndex();
-    }
-    return previous;
-  }
-
-  protected AppendEntriesRequestProto createRequest() {
-    final TermIndex previous = getPrevious();
-    final long leaderNext = raftLog.getNextIndex();
-    long next = follower.getNextIndex() + buffer.getPendingEntryNum();
-    boolean toSend = false;
-
-    if (leaderNext == next && !buffer.isEmpty()) {
-      // no new entries, then send out the entries in the buffer
-      toSend = true;
-    } else if (leaderNext > next) {
-      while (leaderNext > next && !buffer.isFull()) {
-        // stop adding entry once the buffer size is >= the max size
-        buffer.addEntry(raftLog.get(next++));
-      }
-      if (buffer.isFull() || !batchSending) {
-        // buffer is full or batch sending is disabled, send out a request
-        toSend = true;
-      }
-    }
-
-    if (toSend || shouldHeartbeat()) {
-      return buffer.getAppendRequest(previous);
-    }
-    return null;
-  }
-
-  /** Send an appendEntries RPC; retry indefinitely. */
-  private AppendEntriesReplyProto sendAppendEntriesWithRetries()
-      throws InterruptedException, InterruptedIOException {
-    int retry = 0;
-    AppendEntriesRequestProto request = null;
-    while (isAppenderRunning()) { // keep retrying for IOException
-      try {
-        if (request == null || request.getEntriesCount() == 0) {
-          request = createRequest();
-        }
-
-        if (request == null) {
-          LOG.trace("{} need not send AppendEntries now." +
-              " Wait for more entries.", server.getId());
-          return null;
-        } else if (!isAppenderRunning()) {
-          LOG.debug("LogAppender {} has been stopped. Skip the request.", this);
-          return null;
-        }
-
-        follower.updateLastRpcSendTime();
-        final AppendEntriesReplyProto r = server.getServerRpc()
-            .sendAppendEntries(request);
-        follower.updateLastRpcResponseTime();
-
-        return r;
-      } catch (InterruptedIOException iioe) {
-        throw iioe;
-      } catch (IOException ioe) {
-        LOG.debug(this + ": failed to send appendEntries; retry " + retry++, ioe);
-      }
-      if (isAppenderRunning()) {
-        Thread.sleep(leaderState.getSyncInterval());
-      }
-    }
-    return null;
-  }
-
-  protected class SnapshotRequestIter
-      implements Iterable<InstallSnapshotRequestProto> {
-    private final SnapshotInfo snapshot;
-    private final List<FileInfo> files;
-    private FileInputStream in;
-    private int fileIndex = 0;
-
-    private FileInfo currentFileInfo;
-    private byte[] currentBuf;
-    private long currentFileSize;
-    private long currentOffset = 0;
-    private int chunkIndex = 0;
-
-    private final String requestId;
-    private int requestIndex = 0;
-
-    public SnapshotRequestIter(SnapshotInfo snapshot, String requestId)
-        throws IOException {
-      this.snapshot = snapshot;
-      this.requestId = requestId;
-      this.files = snapshot.getFiles();
-      if (files.size() > 0) {
-        startReadFile();
-      }
-    }
-
-    private void startReadFile() throws IOException {
-      currentFileInfo = files.get(fileIndex);
-      File snapshotFile = currentFileInfo.getPath().toFile();
-      currentFileSize = snapshotFile.length();
-      final int bufLength =
-          (int) Math.min(leaderState.getSnapshotChunkMaxSize(), currentFileSize);
-      currentBuf = new byte[bufLength];
-      currentOffset = 0;
-      chunkIndex = 0;
-      in = new FileInputStream(snapshotFile);
-    }
-
-    @Override
-    public Iterator<InstallSnapshotRequestProto> iterator() {
-      return new Iterator<InstallSnapshotRequestProto>() {
-        @Override
-        public boolean hasNext() {
-          return fileIndex < files.size();
-        }
-
-        @Override
-        public InstallSnapshotRequestProto next() {
-          if (fileIndex >= files.size()) {
-            throw new NoSuchElementException();
-          }
-          int targetLength = (int) Math.min(currentFileSize - currentOffset,
-              leaderState.getSnapshotChunkMaxSize());
-          FileChunkProto chunk;
-          try {
-            chunk = readFileChunk(currentFileInfo, in, currentBuf,
-                targetLength, currentOffset, chunkIndex);
-            boolean done = (fileIndex == files.size() - 1) &&
-                chunk.getDone();
-            InstallSnapshotRequestProto request =
-                server.createInstallSnapshotRequest(follower.getPeer().getId(),
-                    requestId, requestIndex++, snapshot,
-                    Lists.newArrayList(chunk), done);
-            currentOffset += targetLength;
-            chunkIndex++;
-
-            if (currentOffset >= currentFileSize) {
-              in.close();
-              fileIndex++;
-              if (fileIndex < files.size()) {
-                startReadFile();
-              }
-            }
-
-            return request;
-          } catch (IOException e) {
-            if (in != null) {
-              try {
-                in.close();
-              } catch (IOException ignored) {
-              }
-            }
-            LOG.warn("Got exception when preparing InstallSnapshot request", e);
-            throw new RuntimeException(e);
-          }
-        }
-      };
-    }
-  }
-
-  private FileChunkProto readFileChunk(FileInfo fileInfo,
-      FileInputStream in, byte[] buf, int length, long offset, int chunkIndex)
-      throws IOException {
-    FileChunkProto.Builder builder = FileChunkProto.newBuilder()
-        .setOffset(offset).setChunkIndex(chunkIndex);
-    IOUtils.readFully(in, buf, 0, length);
-    Path relativePath = server.getState().getStorage().getStorageDir()
-        .relativizeToRoot(fileInfo.getPath());
-    builder.setFilename(relativePath.toString());
-    builder.setDone(offset + length == fileInfo.getFileSize());
-    builder.setFileDigest(
-        ByteString.copyFrom(fileInfo.getFileDigest().getDigest()));
-    builder.setData(ByteString.copyFrom(buf, 0, length));
-    return builder.build();
-  }
-
-  private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot)
-      throws InterruptedException, InterruptedIOException {
-    String requestId = UUID.randomUUID().toString();
-    InstallSnapshotReplyProto reply = null;
-    try {
-      for (InstallSnapshotRequestProto request :
-          new SnapshotRequestIter(snapshot, requestId)) {
-        follower.updateLastRpcSendTime();
-        reply = server.getServerRpc().sendInstallSnapshot(request);
-        follower.updateLastRpcResponseTime();
-
-        if (!reply.getServerReply().getSuccess()) {
-          return reply;
-        }
-      }
-    } catch (InterruptedIOException iioe) {
-      throw iioe;
-    } catch (Exception ioe) {
-      LOG.warn(this + ": failed to install SnapshotInfo " + snapshot.getFiles(),
-          ioe);
-      return null;
-    }
-
-    if (reply != null) {
-      follower.updateMatchIndex(snapshot.getTermIndex().getIndex());
-      follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1);
-      LOG.info("{}: install snapshot-{} successfully on follower {}",
-          server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer());
-    }
-    return reply;
-  }
-
-  protected SnapshotInfo shouldInstallSnapshot() {
-    final long logStartIndex = raftLog.getStartIndex();
-    // we should install snapshot if the follower needs to catch up and:
-    // 1. there is no local log entry but there is snapshot
-    // 2. or the follower's next index is smaller than the log start index
-    if (follower.getNextIndex() < raftLog.getNextIndex()) {
-      SnapshotInfo snapshot = server.getState().getLatestSnapshot();
-      if (follower.getNextIndex() < logStartIndex ||
-          (logStartIndex == INVALID_LOG_INDEX && snapshot != null)) {
-        return snapshot;
-      }
-    }
-    return null;
-  }
-
-  /** Check and send appendEntries RPC */
-  private void checkAndSendAppendEntries()
-      throws InterruptedException, InterruptedIOException {
-    while (isAppenderRunning()) {
-      if (shouldSendRequest()) {
-        SnapshotInfo snapshot = shouldInstallSnapshot();
-        if (snapshot != null) {
-          LOG.info("{}: follower {}'s next index is {}," +
-              " log's start index is {}, need to install snapshot",
-              server.getId(), follower.getPeer(), follower.getNextIndex(),
-              raftLog.getStartIndex());
-
-          final InstallSnapshotReplyProto r = installSnapshot(snapshot);
-          if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
-            checkResponseTerm(r.getTerm());
-          } // otherwise if r is null, retry the snapshot installation
-        } else {
-          final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
-          if (r != null) {
-            handleReply(r);
-          }
-        }
-      }
-      if (isAppenderRunning() && !shouldAppendEntries(
-          follower.getNextIndex() + buffer.getPendingEntryNum())) {
-        final long waitTime = getHeartbeatRemainingTime(
-            follower.getLastRpcTime());
-        if (waitTime > 0) {
-          synchronized (this) {
-            wait(waitTime);
-          }
-        }
-      }
-    }
-  }
-
-  private void handleReply(AppendEntriesReplyProto reply) {
-    if (reply != null) {
-      switch (reply.getResult()) {
-        case SUCCESS:
-          final long oldNextIndex = follower.getNextIndex();
-          final long nextIndex = reply.getNextIndex();
-          if (nextIndex < oldNextIndex) {
-            throw new IllegalStateException("nextIndex=" + nextIndex
-                + " < oldNextIndex=" + oldNextIndex
-                + ", reply=" + ProtoUtils.toString(reply));
-          }
-
-          if (nextIndex > oldNextIndex) {
-            follower.updateMatchIndex(nextIndex - 1);
-            follower.updateNextIndex(nextIndex);
-            submitEventOnSuccessAppend();
-          }
-          break;
-        case NOT_LEADER:
-          // check if should step down
-          checkResponseTerm(reply.getTerm());
-          break;
-        case INCONSISTENCY:
-          follower.decreaseNextIndex(reply.getNextIndex());
-          break;
-        case UNRECOGNIZED:
-          LOG.warn("{} received UNRECOGNIZED AppendResult from {}",
-              server.getId(), follower.getPeer().getId());
-          break;
-      }
-    }
-  }
-
-  protected void submitEventOnSuccessAppend() {
-    LeaderState.StateUpdateEvent e = follower.isAttendingVote() ?
-        LeaderState.UPDATE_COMMIT_EVENT :
-        LeaderState.STAGING_PROGRESS_EVENT;
-    leaderState.submitUpdateStateEvent(e);
-  }
-
-  public synchronized void notifyAppend() {
-    this.notify();
-  }
-
-  /** Should the leader send appendEntries RPC to this follower? */
-  protected boolean shouldSendRequest() {
-    return shouldAppendEntries(follower.getNextIndex()) || shouldHeartbeat();
-  }
-
-  private boolean shouldAppendEntries(long followerIndex) {
-    return followerIndex < raftLog.getNextIndex();
-  }
-
-  private boolean shouldHeartbeat() {
-    return getHeartbeatRemainingTime(follower.getLastRpcTime()) <= 0;
-  }
-
-  /**
-   * @return the time in milliseconds that the leader should send a heartbeat.
-   */
-  protected long getHeartbeatRemainingTime(Timestamp lastTime) {
-    return server.getMinTimeoutMs() / 2 - lastTime.elapsedTimeMs();
-  }
-
-  protected void checkResponseTerm(long responseTerm) {
-    synchronized (server) {
-      if (isAppenderRunning() && follower.isAttendingVote()
-          && responseTerm > leaderState.getCurrentTerm()) {
-        leaderState.submitUpdateStateEvent(
-            new LeaderState.StateUpdateEvent(StateUpdateEventType.STEPDOWN,
-                responseTerm));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/LogAppenderFactory.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/LogAppenderFactory.java b/raft-server/src/main/java/org/apache/raft/server/LogAppenderFactory.java
deleted file mode 100644
index 3cb2b06..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/LogAppenderFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-public interface LogAppenderFactory {
-  LogAppender getLogAppender(RaftServer server, LeaderState state,
-      FollowerInfo f);
-
-  class SynchronousLogAppenderFactory implements LogAppenderFactory {
-    @Override
-    public LogAppender getLogAppender(RaftServer server, LeaderState state,
-        FollowerInfo f) {
-      return new LogAppender(server, state, f);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/PeerConfiguration.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/PeerConfiguration.java b/raft-server/src/main/java/org/apache/raft/server/PeerConfiguration.java
deleted file mode 100644
index 9f01390..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/PeerConfiguration.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.protocol.RaftPeer;
-
-import java.util.*;
-
-/**
- * The peer configuration of a raft cluster.
- *
- * The objects of this class are immutable.
- */
-class PeerConfiguration {
-  private final Map<String, RaftPeer> peers;
-
-  PeerConfiguration(Iterable<RaftPeer> peers) {
-    Preconditions.checkNotNull(peers);
-    Map<String, RaftPeer> map = new HashMap<>();
-    for(RaftPeer p : peers) {
-      map.put(p.getId(), p);
-    }
-    this.peers = Collections.unmodifiableMap(map);
-    Preconditions.checkState(!this.peers.isEmpty());
-  }
-
-  Collection<RaftPeer> getPeers() {
-    return Collections.unmodifiableCollection(peers.values());
-  }
-
-  int size() {
-    return peers.size();
-  }
-
-  @Override
-  public String toString() {
-    return peers.values().toString();
-  }
-
-  RaftPeer getPeer(String id) {
-    return peers.get(id);
-  }
-
-  boolean contains(String id) {
-    return peers.containsKey(id);
-  }
-
-  List<RaftPeer> getOtherPeers(String selfId) {
-    List<RaftPeer> others = new ArrayList<>();
-    for (Map.Entry<String, RaftPeer> entry : peers.entrySet()) {
-      if (!selfId.equals(entry.getValue().getId())) {
-        others.add(entry.getValue());
-      }
-    }
-    return others;
-  }
-
-  boolean hasMajority(Collection<String> others, String selfId) {
-    Preconditions.checkArgument(!others.contains(selfId));
-    int num = 0;
-    if (contains(selfId)) {
-      num++;
-    }
-    for (String other : others) {
-      if (contains(other)) {
-        num++;
-      }
-      if (num > size() / 2) {
-        return true;
-      }
-    }
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/PendingRequest.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/PendingRequest.java b/raft-server/src/main/java/org/apache/raft/server/PendingRequest.java
deleted file mode 100644
index 3598349..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/PendingRequest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.protocol.Message;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.protocol.SetConfigurationRequest;
-import org.apache.raft.statemachine.TransactionContext;
-
-import java.util.concurrent.CompletableFuture;
-
-public class PendingRequest implements Comparable<PendingRequest> {
-  private final Long index;
-  private final RaftClientRequest request;
-  private final TransactionContext entry;
-  private final CompletableFuture<RaftClientReply> future;
-
-  PendingRequest(long index, RaftClientRequest request,
-                 TransactionContext entry) {
-    this.index = index;
-    this.request = request;
-    this.entry = entry;
-    this.future = new CompletableFuture<>();
-  }
-
-  PendingRequest(SetConfigurationRequest request) {
-    this(RaftServerConstants.INVALID_LOG_INDEX, request, null);
-  }
-
-  long getIndex() {
-    return index;
-  }
-
-  RaftClientRequest getRequest() {
-    return request;
-  }
-
-  public CompletableFuture<RaftClientReply> getFuture() {
-    return future;
-  }
-
-  TransactionContext getEntry() {
-    return entry;
-  }
-
-  synchronized void setException(Throwable e) {
-    Preconditions.checkArgument(e != null);
-    future.completeExceptionally(e);
-  }
-
-  synchronized void setReply(RaftClientReply r) {
-    Preconditions.checkArgument(r != null);
-    future.complete(r);
-  }
-
-  void setSuccessReply(Message message) {
-    setReply(new RaftClientReply(getRequest(), message));
-  }
-
-  @Override
-  public int compareTo(PendingRequest that) {
-    return Long.compare(this.index, that.index);
-  }
-
-  @Override
-  public String toString() {
-    return getClass().getSimpleName() + "(index=" + index
-        + ", request=" + request;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/PendingRequests.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/PendingRequests.java b/raft-server/src/main/java/org/apache/raft/server/PendingRequests.java
deleted file mode 100644
index a5731fd..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/PendingRequests.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.stream.Collectors;
-
-import org.apache.raft.protocol.Message;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.protocol.RaftException;
-import org.apache.raft.protocol.SetConfigurationRequest;
-import org.apache.raft.statemachine.TransactionContext;
-import org.slf4j.Logger;
-
-import com.google.common.base.Preconditions;
-
-class PendingRequests {
-  private static final Logger LOG = RaftServer.LOG;
-
-  private PendingRequest pendingSetConf;
-  private final RaftServer server;
-  private final ConcurrentMap<Long, PendingRequest> pendingRequests = new ConcurrentHashMap<>();
-  private PendingRequest last = null;
-
-  PendingRequests(RaftServer server) {
-    this.server = server;
-  }
-
-  PendingRequest addPendingRequest(long index, RaftClientRequest request,
-      TransactionContext entry) {
-    // externally synced for now
-    Preconditions.checkArgument(!request.isReadOnly());
-    Preconditions.checkState(last == null || index == last.getIndex() + 1);
-    return add(index, request, entry);
-  }
-
-  private PendingRequest add(long index, RaftClientRequest request,
-      TransactionContext entry) {
-    final PendingRequest pending = new PendingRequest(index, request, entry);
-    pendingRequests.put(index, pending);
-    last = pending;
-    return pending;
-  }
-
-  PendingRequest addConfRequest(SetConfigurationRequest request) {
-    Preconditions.checkState(pendingSetConf == null);
-    pendingSetConf = new PendingRequest(request);
-    return pendingSetConf;
-  }
-
-  void replySetConfiguration() {
-    // we allow the pendingRequest to be null in case that the new leader
-    // commits the new configuration while it has not received the retry
-    // request from the client
-    if (pendingSetConf != null) {
-      // for setConfiguration we do not need to wait for statemachine. send back
-      // reply after it's committed.
-      pendingSetConf.setSuccessReply(null);
-      pendingSetConf = null;
-    }
-  }
-
-  void failSetConfiguration(RaftException e) {
-    Preconditions.checkState(pendingSetConf != null);
-    pendingSetConf.setException(e);
-    pendingSetConf = null;
-  }
-
-  TransactionContext getTransactionContext(long index) {
-    PendingRequest pendingRequest = pendingRequests.get(index);
-    // it is possible that the pendingRequest is null if this peer just becomes
-    // the new leader and commits transactions received by the previous leader
-    return pendingRequest != null ? pendingRequest.getEntry() : null;
-  }
-
-  void replyPendingRequest(long index, CompletableFuture<Message> messageFuture) {
-    final PendingRequest pending = pendingRequests.get(index);
-    if (pending != null) {
-      Preconditions.checkState(pending.getIndex() == index);
-
-      messageFuture.whenComplete((reply, exception) -> {
-        if (exception == null) {
-          pending.setSuccessReply(reply);
-        } else {
-          pending.setException(exception);
-        }
-      });
-    }
-  }
-
-  /**
-   * The leader state is stopped. Send NotLeaderException to all the pending
-   * requests since they have not got applied to the state machine yet.
-   */
-  void sendNotLeaderResponses() throws IOException {
-    LOG.info("{} sends responses before shutting down PendingRequestsHandler",
-        server.getId());
-
-    Collection<TransactionContext> pendingEntries = pendingRequests.values().stream()
-        .map(PendingRequest::getEntry).collect(Collectors.toList());
-    // notify the state machine about stepping down
-    server.getStateMachine().notifyNotLeader(pendingEntries);
-    pendingRequests.values().forEach(this::setNotLeaderException);
-    if (pendingSetConf != null) {
-      setNotLeaderException(pendingSetConf);
-    }
-  }
-
-  private void setNotLeaderException(PendingRequest pending) {
-    RaftClientReply reply = new RaftClientReply(pending.getRequest(),
-        server.generateNotLeaderException());
-    pending.setReply(reply);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RaftConfiguration.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftConfiguration.java b/raft-server/src/main/java/org/apache/raft/server/RaftConfiguration.java
deleted file mode 100644
index 54ed9d6..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/RaftConfiguration.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.raft.protocol.RaftPeer;
-
-import java.util.*;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * The configuration of the raft cluster.
- *
- * The configuration is stable if there is no on-going peer change. Otherwise,
- * the configuration is transitional, i.e. in the middle of a peer change.
- *
- * The objects of this class are immutable.
- */
-public class RaftConfiguration {
-  /** Create a {@link Builder}. */
-  public static Builder newBuilder() {
-    return new Builder();
-  }
-
-  /** To build {@link RaftConfiguration} objects. */
-  public static class Builder {
-    private PeerConfiguration oldConf;
-    private PeerConfiguration conf;
-    private long logEntryIndex = RaftServerConstants.INVALID_LOG_INDEX;
-
-    private boolean forceStable = false;
-    private boolean forceTransitional = false;
-
-    private Builder() {}
-
-    public Builder setConf(PeerConfiguration conf) {
-      Preconditions.checkNotNull(conf);
-      Preconditions.checkState(this.conf == null, "conf is already set.");
-      this.conf = conf;
-      return this;
-    }
-
-    public Builder setConf(Iterable<RaftPeer> peers) {
-      return setConf(new PeerConfiguration(peers));
-    }
-
-    public Builder setConf(RaftPeer[] peers) {
-      return setConf(Arrays.asList(peers));
-    }
-
-    Builder setConf(RaftConfiguration transitionalConf) {
-      Preconditions.checkNotNull(transitionalConf);
-      Preconditions.checkState(transitionalConf.isTransitional());
-
-      Preconditions.checkState(!forceTransitional);
-      forceStable = true;
-      return setConf(transitionalConf.conf);
-    }
-
-
-    public Builder setOldConf(PeerConfiguration oldConf) {
-      Preconditions.checkNotNull(oldConf);
-      Preconditions.checkState(this.oldConf == null, "oldConf is already set.");
-      this.oldConf = oldConf;
-      return this;
-    }
-
-    public Builder setOldConf(Iterable<RaftPeer> oldPeers) {
-      return setOldConf(new PeerConfiguration(oldPeers));
-    }
-
-    public Builder setOldConf(RaftPeer[] oldPeers) {
-      return setOldConf(Arrays.asList(oldPeers));
-    }
-
-    Builder setOldConf(RaftConfiguration stableConf) {
-      Preconditions.checkNotNull(stableConf);
-      Preconditions.checkState(stableConf.isStable());
-
-      Preconditions.checkState(!forceStable);
-      forceTransitional = true;
-      return setOldConf(stableConf.conf);
-    }
-
-    public Builder setLogEntryIndex(long logEntryIndex) {
-      Preconditions.checkArgument(
-          logEntryIndex != RaftServerConstants.INVALID_LOG_INDEX);
-      Preconditions.checkState(
-          this.logEntryIndex == RaftServerConstants.INVALID_LOG_INDEX,
-          "logEntryIndex is already set.");
-      this.logEntryIndex = logEntryIndex;
-      return this;
-    }
-
-    /** Build a {@link RaftConfiguration}. */
-    public RaftConfiguration build() {
-      if (forceTransitional) {
-        Preconditions.checkState(oldConf != null);
-      }
-      if (forceStable) {
-        Preconditions.checkState(oldConf == null);
-      }
-      return new RaftConfiguration(conf, oldConf, logEntryIndex);
-    }
-  }
-
-  /** Non-null only if this configuration is transitional. */
-  private final PeerConfiguration oldConf;
-  /**
-   * The current peer configuration while this configuration is stable;
-   * or the new peer configuration while this configuration is transitional.
-   */
-  private final PeerConfiguration conf;
-
-  /** The index of the corresponding log entry for this configuration. */
-  private final long logEntryIndex;
-
-  private RaftConfiguration(PeerConfiguration conf, PeerConfiguration oldConf,
-      long logEntryIndex) {
-    Preconditions.checkNotNull(conf);
-    this.conf = conf;
-    this.oldConf = oldConf;
-    this.logEntryIndex = logEntryIndex;
-  }
-
-  /** Is this configuration transitional, i.e. in the middle of a peer change? */
-  public boolean isTransitional() {
-    return oldConf != null;
-  }
-
-  /** Is this configuration stable, i.e. no on-going peer change? */
-  public boolean isStable() {
-    return oldConf == null;
-  }
-
-  boolean containsInConf(String peerId) {
-    return conf.contains(peerId);
-  }
-
-  boolean containsInOldConf(String peerId) {
-    return oldConf != null && oldConf.contains(peerId);
-  }
-
-  public boolean contains(String peerId) {
-    return containsInConf(peerId) && (oldConf == null || containsInOldConf(peerId));
-  }
-
-  /**
-   * @return the peer corresponding to the given id;
-   *         or return null if the peer is not in this configuration.
-   */
-  public RaftPeer getPeer(String id) {
-    if (id == null) {
-      return null;
-    }
-    RaftPeer peer = conf.getPeer(id);
-    if (peer != null) {
-      return peer;
-    } else if (oldConf != null) {
-      return oldConf.getPeer(id);
-    }
-    return null;
-  }
-
-  /** @return all the peers from the conf, and the old conf if it exists. */
-  public Collection<RaftPeer> getPeers() {
-    final Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers());
-    if (oldConf != null) {
-      oldConf.getPeers().stream().filter(p -> !peers.contains(p))
-          .forEach(peers::add);
-    }
-    return peers;
-  }
-
-  /**
-   * @return all the peers other than the given self id from the conf,
-   *         and the old conf if it exists.
-   */
-  public Collection<RaftPeer> getOtherPeers(String selfId) {
-    Collection<RaftPeer> others = conf.getOtherPeers(selfId);
-    if (oldConf != null) {
-      oldConf.getOtherPeers(selfId).stream()
-          .filter(p -> !others.contains(p))
-          .forEach(others::add);
-    }
-    return others;
-  }
-
-  /** @return true if the self id together with the others are in the majority. */
-  public boolean hasMajority(Collection<String> others, String selfId) {
-    Preconditions.checkArgument(!others.contains(selfId));
-    return conf.hasMajority(others, selfId) &&
-        (oldConf == null || oldConf.hasMajority(others, selfId));
-  }
-
-  @Override
-  public String toString() {
-    return conf + (oldConf != null ? "old:" + oldConf : "");
-  }
-
-  @VisibleForTesting
-  boolean hasNoChange(RaftPeer[] newMembers) {
-    if (!isStable() || conf.size() != newMembers.length) {
-      return false;
-    }
-    for (RaftPeer peer : newMembers) {
-      if (!conf.contains(peer.getId())) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  long getLogEntryIndex() {
-    return logEntryIndex;
-  }
-
-  static Collection<RaftPeer> computeNewPeers(RaftPeer[] newMembers,
-      RaftConfiguration old) {
-    List<RaftPeer> peers = new ArrayList<>();
-    for (RaftPeer p : newMembers) {
-      if (!old.containsInConf(p.getId())) {
-        peers.add(p);
-      }
-    }
-    return peers;
-  }
-
-  RaftPeer getRandomPeer(String exclusiveId) {
-    final List<RaftPeer> peers = conf.getOtherPeers(exclusiveId);
-    if (peers.isEmpty()) {
-      return null;
-    }
-    final int index = ThreadLocalRandom.current().nextInt(peers.size());
-    return peers.get(index);
-  }
-
-  public Collection<RaftPeer> getPeersInOldConf() {
-    return oldConf != null ? oldConf.getPeers() : Collections.emptyList();
-  }
-
-  public Collection<RaftPeer> getPeersInConf() {
-    return conf.getPeers();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServer.java b/raft-server/src/main/java/org/apache/raft/server/RaftServer.java
deleted file mode 100644
index c7ea6b2..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/RaftServer.java
+++ /dev/null
@@ -1,750 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.*;
-import org.apache.raft.server.protocol.RaftServerProtocol;
-import org.apache.raft.server.impl.ServerProtoUtils;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.FileInfo;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.statemachine.TransactionContext;
-import org.apache.raft.util.CodeInjectionForTesting;
-import org.apache.raft.util.LifeCycle;
-import org.apache.raft.util.ProtoUtils;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.OptionalLong;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.raft.server.LeaderState.UPDATE_COMMIT_EVENT;
-import static org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*;
-import static org.apache.raft.util.LifeCycle.State.*;
-
-public class RaftServer implements RaftServerProtocol, Closeable {
-  public static final Logger LOG = LoggerFactory.getLogger(RaftServer.class);
-
-  private static final String CLASS_NAME = RaftServer.class.getSimpleName();
-  static final String REQUEST_VOTE = CLASS_NAME + ".requestVote";
-  static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
-  static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot";
-
-
-  private final int minTimeoutMs;
-  private final int maxTimeoutMs;
-
-  private final LifeCycle lifeCycle;
-  private final ServerState state;
-  private final StateMachine stateMachine;
-  private final RaftProperties properties;
-  private volatile Role role;
-
-  /** used when the peer is follower, to monitor election timeout */
-  private volatile FollowerState heartbeatMonitor;
-
-  /** used when the peer is candidate, to request votes from other peers */
-  private volatile LeaderElection electionDaemon;
-
-  /** used when the peer is leader */
-  private volatile LeaderState leaderState;
-
-  private RaftServerRpc serverRpc;
-
-  private final LogAppenderFactory appenderFactory;
-
-  public RaftServer(String id, RaftConfiguration raftConf,
-      RaftProperties properties, StateMachine stateMachine) throws IOException {
-    this.lifeCycle = new LifeCycle(id);
-    minTimeoutMs = properties.getInt(
-        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY,
-        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT);
-    maxTimeoutMs = properties.getInt(
-        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY,
-        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT);
-    Preconditions.checkArgument(maxTimeoutMs > minTimeoutMs,
-        "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
-    this.properties = properties;
-    this.stateMachine = stateMachine;
-    this.state = new ServerState(id, raftConf, properties, this, stateMachine);
-    appenderFactory = initAppenderFactory();
-  }
-
-  public int getMinTimeoutMs() {
-    return minTimeoutMs;
-  }
-
-  public int getMaxTimeoutMs() {
-    return maxTimeoutMs;
-  }
-
-  public int getRandomTimeoutMs() {
-    return RaftUtils.getRandomBetween(minTimeoutMs, maxTimeoutMs);
-  }
-
-  public StateMachine getStateMachine() {
-    return this.stateMachine;
-  }
-
-  public LogAppenderFactory getLogAppenderFactory() {
-    return appenderFactory;
-  }
-
-  private LogAppenderFactory initAppenderFactory() {
-    Class<? extends LogAppenderFactory> factoryClass = properties.getClass(
-        RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
-        RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT,
-        LogAppenderFactory.class);
-    return RaftUtils.newInstance(factoryClass);
-  }
-
-  /**
-   * Used by tests to set initial raft configuration with correct port bindings.
-   */
-  @VisibleForTesting
-  public void setInitialConf(RaftConfiguration conf) {
-    this.state.setInitialConf(conf);
-  }
-
-  public void setServerRpc(RaftServerRpc serverRpc) {
-    this.serverRpc = serverRpc;
-    // add peers into rpc service
-    RaftConfiguration conf = getRaftConf();
-    if (conf != null) {
-      addPeersToRPC(conf.getPeers());
-    }
-  }
-
-  public RaftServerRpc getServerRpc() {
-    return serverRpc;
-  }
-
-  public void start() {
-    lifeCycle.transition(STARTING);
-    state.start();
-    RaftConfiguration conf = getRaftConf();
-    if (conf != null && conf.contains(getId())) {
-      LOG.debug("{} starts as a follower", getId());
-      startAsFollower();
-    } else {
-      LOG.debug("{} starts with initializing state", getId());
-      startInitializing();
-    }
-  }
-
-  /**
-   * The peer belongs to the current configuration, should start as a follower
-   */
-  private void startAsFollower() {
-    role = Role.FOLLOWER;
-    heartbeatMonitor = new FollowerState(this);
-    heartbeatMonitor.start();
-
-    serverRpc.start();
-    lifeCycle.transition(RUNNING);
-  }
-
-  /**
-   * The peer does not have any configuration (maybe it will later be included
-   * in some configuration). Start still as a follower but will not vote or
-   * start election.
-   */
-  private void startInitializing() {
-    role = Role.FOLLOWER;
-    // do not start heartbeatMonitoring
-    serverRpc.start();
-  }
-
-  public ServerState getState() {
-    return this.state;
-  }
-
-  public String getId() {
-    return getState().getSelfId();
-  }
-
-  public RaftConfiguration getRaftConf() {
-    return getState().getRaftConf();
-  }
-
-  @Override
-  public void close() {
-    lifeCycle.checkStateAndClose(() -> {
-      try {
-        shutdownHeartbeatMonitor();
-        shutdownElectionDaemon();
-        shutdownLeaderState();
-
-        serverRpc.shutdown();
-        state.close();
-      } catch (Exception ignored) {
-        LOG.warn("Failed to kill " + state.getSelfId(), ignored);
-      }
-    });
-  }
-
-  public boolean isAlive() {
-    return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED);
-  }
-
-  public boolean isFollower() {
-    return role == Role.FOLLOWER;
-  }
-
-  public boolean isCandidate() {
-    return role == Role.CANDIDATE;
-  }
-
-  public boolean isLeader() {
-    return role == Role.LEADER;
-  }
-
-  Role getRole() {
-    return role;
-  }
-
-  /**
-   * Change the server state to Follower if necessary
-   * @param newTerm The new term.
-   * @param sync We will call {@link ServerState#persistMetadata()} if this is
-   *             set to true and term/votedFor get updated.
-   * @return if the term/votedFor should be updated to the new term
-   * @throws IOException if term/votedFor persistence failed.
-   */
-  synchronized boolean changeToFollower(long newTerm, boolean sync)
-      throws IOException {
-    final Role old = role;
-    role = Role.FOLLOWER;
-
-    boolean metadataUpdated = false;
-    if (newTerm > state.getCurrentTerm()) {
-      state.setCurrentTerm(newTerm);
-      state.resetLeaderAndVotedFor();
-      metadataUpdated = true;
-    }
-
-    if (old == Role.LEADER) {
-      assert leaderState != null;
-      shutdownLeaderState();
-    } else if (old == Role.CANDIDATE) {
-      shutdownElectionDaemon();
-    }
-
-    if (old != Role.FOLLOWER) {
-      heartbeatMonitor = new FollowerState(this);
-      heartbeatMonitor.start();
-    }
-
-    if (metadataUpdated && sync) {
-      state.persistMetadata();
-    }
-    return metadataUpdated;
-  }
-
-  private synchronized void shutdownLeaderState() {
-    final LeaderState leader = leaderState;
-    if (leader != null) {
-      leader.stop();
-    }
-    leaderState = null;
-    // TODO: make sure that StateMachineUpdater has applied all transactions that have context
-  }
-
-  private void shutdownElectionDaemon() {
-    final LeaderElection election = electionDaemon;
-    if (election != null) {
-      election.stopRunning();
-      // no need to interrupt the election thread
-    }
-    electionDaemon = null;
-  }
-
-  synchronized void changeToLeader() {
-    Preconditions.checkState(isCandidate());
-    shutdownElectionDaemon();
-    role = Role.LEADER;
-    state.becomeLeader();
-    // start sending AppendEntries RPC to followers
-    leaderState = new LeaderState(this, properties);
-    leaderState.start();
-  }
-
-  private void shutdownHeartbeatMonitor() {
-    final FollowerState hm = heartbeatMonitor;
-    if (hm != null) {
-      hm.stopRunning();
-      hm.interrupt();
-    }
-    heartbeatMonitor = null;
-  }
-
-  synchronized void changeToCandidate() {
-    Preconditions.checkState(isFollower());
-    shutdownHeartbeatMonitor();
-    role = Role.CANDIDATE;
-    // start election
-    electionDaemon = new LeaderElection(this);
-    electionDaemon.start();
-  }
-
-  @Override
-  public String toString() {
-    return role + " " + state + " " + lifeCycle.getCurrentState();
-  }
-
-  /**
-   * @return null if the server is in leader state.
-   */
-  CompletableFuture<RaftClientReply> checkLeaderState(
-      RaftClientRequest request) {
-    if (!isLeader()) {
-      NotLeaderException exception = generateNotLeaderException();
-      CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
-      future.complete(new RaftClientReply(request, exception));
-      return future;
-    }
-    return null;
-  }
-
-  NotLeaderException generateNotLeaderException() {
-    if (lifeCycle.getCurrentState() != RUNNING) {
-      return new NotLeaderException(getId(), null, null);
-    }
-    String leaderId = state.getLeaderId();
-    if (leaderId == null || leaderId.equals(state.getSelfId())) {
-      // No idea about who is the current leader. Or the peer is the current
-      // leader, but it is about to step down
-      RaftPeer suggestedLeader = state.getRaftConf()
-          .getRandomPeer(state.getSelfId());
-      leaderId = suggestedLeader == null ? null : suggestedLeader.getId();
-    }
-    RaftConfiguration conf = getRaftConf();
-    Collection<RaftPeer> peers = conf.getPeers();
-    return new NotLeaderException(getId(), conf.getPeer(leaderId),
-        peers.toArray(new RaftPeer[peers.size()]));
-  }
-
-  /**
-   * Handle a normal update request from client.
-   */
-  public CompletableFuture<RaftClientReply> appendTransaction(
-      RaftClientRequest request, TransactionContext entry)
-      throws RaftException {
-    LOG.debug("{}: receive client request({})", getId(), request);
-    lifeCycle.assertCurrentState(RUNNING);
-    CompletableFuture<RaftClientReply> reply;
-
-    final PendingRequest pending;
-    synchronized (this) {
-      reply = checkLeaderState(request);
-      if (reply != null) {
-        return reply;
-      }
-
-      // append the message to its local log
-      final long entryIndex;
-      try {
-        entryIndex = state.applyLog(entry);
-      } catch (IOException e) {
-        throw new RaftException(e);
-      }
-
-      // put the request into the pending queue
-      pending = leaderState.addPendingRequest(entryIndex, request, entry);
-      leaderState.notifySenders();
-    }
-    return pending.getFuture();
-  }
-
-  /**
-   * Handle a raft configuration change request from client.
-   */
-  public CompletableFuture<RaftClientReply> setConfiguration(
-      SetConfigurationRequest request) throws IOException {
-    LOG.debug("{}: receive setConfiguration({})", getId(), request);
-    lifeCycle.assertCurrentState(RUNNING);
-    CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
-    if (reply != null) {
-      return reply;
-    }
-
-    final RaftPeer[] peersInNewConf = request.getPeersInNewConf();
-    final PendingRequest pending;
-    synchronized (this) {
-      reply = checkLeaderState(request);
-      if (reply != null) {
-        return reply;
-      }
-
-      final RaftConfiguration current = getRaftConf();
-      // make sure there is no other raft reconfiguration in progress
-      if (!current.isStable() || leaderState.inStagingState() ||
-          !state.isCurrentConfCommitted()) {
-        throw new ReconfigurationInProgressException(
-            "Reconfiguration is already in progress: " + current);
-      }
-
-      // return true if the new configuration is the same with the current one
-      if (current.hasNoChange(peersInNewConf)) {
-        pending = leaderState.returnNoConfChange(request);
-        return pending.getFuture();
-      }
-
-      // add new peers into the rpc service
-      addPeersToRPC(Arrays.asList(peersInNewConf));
-      // add staging state into the leaderState
-      pending = leaderState.startSetConfiguration(request);
-    }
-    return pending.getFuture();
-  }
-
-  private boolean shouldWithholdVotes() {
-    return isLeader() || (isFollower() && state.hasLeader()
-        && heartbeatMonitor.shouldWithholdVotes());
-  }
-
-  /**
-   * check if the remote peer is not included in the current conf
-   * and should shutdown. should shutdown if all the following stands:
-   * 1. this is a leader
-   * 2. current conf is stable and has been committed
-   * 3. candidate id is not included in conf
-   * 4. candidate's last entry's index < conf's index
-   */
-  private boolean shouldSendShutdown(String candidateId,
-      TermIndex candidateLastEntry) {
-    return isLeader()
-        && getRaftConf().isStable()
-        && getState().isConfCommitted()
-        && !getRaftConf().containsInConf(candidateId)
-        && candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex()
-        && !leaderState.isBootStrappingPeer(candidateId);
-  }
-
-  @Override
-  public RequestVoteReplyProto requestVote(RequestVoteRequestProto r)
-      throws IOException {
-    final String candidateId = r.getServerRequest().getRequestorId();
-    return requestVote(candidateId, r.getCandidateTerm(),
-        ServerProtoUtils.toTermIndex(r.getCandidateLastEntry()));
-  }
-
-  private RequestVoteReplyProto requestVote(String candidateId,
-      long candidateTerm, TermIndex candidateLastEntry) throws IOException {
-    CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
-        candidateId, candidateTerm, candidateLastEntry);
-    LOG.debug("{}: receive requestVote({}, {}, {})",
-        getId(), candidateId, candidateTerm, candidateLastEntry);
-    lifeCycle.assertCurrentState(RUNNING);
-
-    boolean voteGranted = false;
-    boolean shouldShutdown = false;
-    final RequestVoteReplyProto reply;
-    synchronized (this) {
-      if (shouldWithholdVotes()) {
-        LOG.info("{} Withhold vote from server {} with term {}. " +
-            "This server:{}, last rpc time from leader {} is {}", getId(),
-            candidateId, candidateTerm, this, this.getState().getLeaderId(),
-            (isFollower() ? heartbeatMonitor.getLastRpcTime() : -1));
-      } else if (state.recognizeCandidate(candidateId, candidateTerm)) {
-        boolean termUpdated = changeToFollower(candidateTerm, false);
-        // see Section 5.4.1 Election restriction
-        if (state.isLogUpToDate(candidateLastEntry)) {
-          heartbeatMonitor.updateLastRpcTime(false);
-          state.grantVote(candidateId);
-          voteGranted = true;
-        }
-        if (termUpdated || voteGranted) {
-          state.persistMetadata(); // sync metafile
-        }
-      }
-      if (!voteGranted && shouldSendShutdown(candidateId, candidateLastEntry)) {
-        shouldShutdown = true;
-      }
-      reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(),
-          voteGranted, state.getCurrentTerm(), shouldShutdown);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("{} replies to vote request: {}. Peer's state: {}",
-            getId(), ProtoUtils.toString(reply), state);
-      }
-    }
-    return reply;
-  }
-
-  private void validateEntries(long expectedTerm, TermIndex previous,
-      LogEntryProto... entries) {
-    if (entries != null && entries.length > 0) {
-      final long index0 = entries[0].getIndex();
-
-      if (previous == null || previous.getTerm() == 0) {
-        Preconditions.checkArgument(index0 == 0,
-            "Unexpected Index: previous is null but entries[%s].getIndex()=%s",
-            0, index0);
-      } else {
-        Preconditions.checkArgument(previous.getIndex() == index0 - 1,
-            "Unexpected Index: previous is %s but entries[%s].getIndex()=%s",
-            previous, 0, index0);
-      }
-
-      for (int i = 0; i < entries.length; i++) {
-        final long t = entries[i].getTerm();
-        Preconditions.checkArgument(expectedTerm >= t,
-            "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s",
-            i, t, expectedTerm);
-
-        final long indexi = entries[i].getIndex();
-        Preconditions.checkArgument(indexi == index0 + i,
-            "Unexpected Index: entries[%s].getIndex()=%s but entries[0].getIndex()=%s",
-            i, indexi, index0);
-      }
-    }
-  }
-
-  @Override
-  public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
-      throws IOException {
-    // TODO avoid converting list to array
-    final LogEntryProto[] entries = r.getEntriesList()
-        .toArray(new LogEntryProto[r.getEntriesCount()]);
-    final TermIndex previous = r.hasPreviousLog() ?
-        ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null;
-    return appendEntries(r.getServerRequest().getRequestorId(),
-        r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(),
-        entries);
-  }
-
-  private AppendEntriesReplyProto appendEntries(String leaderId, long leaderTerm,
-      TermIndex previous, long leaderCommit, boolean initializing,
-      LogEntryProto... entries) throws IOException {
-    CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
-        leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("{}: receive appendEntries({}, {}, {}, {}, {}, {})", getId(),
-          leaderId, leaderTerm, previous, leaderCommit, initializing,
-          ServerProtoUtils.toString(entries));
-    }
-    lifeCycle.assertCurrentState(STARTING, RUNNING);
-
-    try {
-      validateEntries(leaderTerm, previous, entries);
-    } catch (IllegalArgumentException e) {
-      throw new IOException(e);
-    }
-
-    final long currentTerm;
-    long nextIndex = state.getLog().getNextIndex();
-    synchronized (this) {
-      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
-      currentTerm = state.getCurrentTerm();
-      if (!recognized) {
-        final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
-            leaderId, getId(), currentTerm, nextIndex, NOT_LEADER);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("{}: do not recognize leader. Reply: {}",
-              getId(), ProtoUtils.toString(reply));
-        }
-        return reply;
-      }
-      changeToFollower(leaderTerm, true);
-      state.setLeader(leaderId);
-
-      if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) {
-        heartbeatMonitor = new FollowerState(this);
-        heartbeatMonitor.start();
-      }
-      if (lifeCycle.getCurrentState() == RUNNING) {
-        heartbeatMonitor.updateLastRpcTime(true);
-      }
-
-      // We need to check if "previous" is in the local peer. Note that it is
-      // possible that "previous" is covered by the latest snapshot: e.g.,
-      // it's possible there's no log entries outside of the latest snapshot.
-      // However, it is not possible that "previous" index is smaller than the
-      // last index included in snapshot. This is because indices <= snapshot's
-      // last index should have been committed.
-      if (previous != null && !containPrevious(previous)) {
-        final AppendEntriesReplyProto reply =
-            ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(),
-                currentTerm, Math.min(nextIndex, previous.getIndex()), INCONSISTENCY);
-        LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}",
-            getId(), previous, ServerProtoUtils.toString(reply));
-        return reply;
-      }
-
-      state.getLog().append(entries);
-      state.updateConfiguration(entries);
-      state.updateStatemachine(leaderCommit, currentTerm);
-    }
-    if (entries != null && entries.length > 0) {
-      try {
-        state.getLog().logSync();
-      } catch (InterruptedException e) {
-        throw new InterruptedIOException("logSync got interrupted");
-      }
-      nextIndex = entries[entries.length - 1].getIndex() + 1;
-    }
-    synchronized (this) {
-      if (lifeCycle.getCurrentState() == RUNNING && isFollower()
-          && getState().getCurrentTerm() == currentTerm) {
-        // reset election timer to avoid punishing the leader for our own
-        // long disk writes
-        heartbeatMonitor.updateLastRpcTime(false);
-      }
-    }
-    final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
-        leaderId, getId(), currentTerm, nextIndex, SUCCESS);
-    LOG.debug("{}: succeeded to handle AppendEntries. Reply: {}", getId(),
-        ServerProtoUtils.toString(reply));
-    return reply;
-  }
-
-  private boolean containPrevious(TermIndex previous) {
-    LOG.debug("{}: prev:{}, latestSnapshot:{}, getLatestInstalledSnapshot:{}",
-        getId(), previous, state.getLatestSnapshot(), state.getLatestInstalledSnapshot());
-    return state.getLog().contains(previous)
-        ||  (state.getLatestSnapshot() != null
-             && state.getLatestSnapshot().getTermIndex().equals(previous))
-        || (state.getLatestInstalledSnapshot() != null)
-             && state.getLatestInstalledSnapshot().equals(previous);
-  }
-
-  @Override
-  public InstallSnapshotReplyProto installSnapshot(
-      InstallSnapshotRequestProto request) throws IOException {
-    final String leaderId = request.getServerRequest().getRequestorId();
-    CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), leaderId, request);
-    LOG.debug("{}: receive installSnapshot({})", getId(), request);
-
-    lifeCycle.assertCurrentState(STARTING, RUNNING);
-
-    final long currentTerm;
-    final long leaderTerm = request.getLeaderTerm();
-    final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex(
-        request.getTermIndex());
-    final long lastIncludedIndex = lastTermIndex.getIndex();
-    synchronized (this) {
-      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
-      currentTerm = state.getCurrentTerm();
-      if (!recognized) {
-        final InstallSnapshotReplyProto reply = ServerProtoUtils
-            .toInstallSnapshotReplyProto(leaderId, getId(), currentTerm,
-                request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
-        LOG.debug("{}: do not recognize leader for installing snapshot." +
-            " Reply: {}", getId(), reply);
-        return reply;
-      }
-      changeToFollower(leaderTerm, true);
-      state.setLeader(leaderId);
-
-      if (lifeCycle.getCurrentState() == RUNNING) {
-        heartbeatMonitor.updateLastRpcTime(true);
-      }
-
-      // Check and append the snapshot chunk. We simply put this in lock
-      // considering a follower peer requiring a snapshot installation does not
-      // have a lot of requests
-      Preconditions.checkState(
-          state.getLog().getNextIndex() <= lastIncludedIndex,
-          "%s log's next id is %s, last included index in snapshot is %s",
-          getId(),  state.getLog().getNextIndex(), lastIncludedIndex);
-
-      //TODO: We should only update State with installed snapshot once the request is done.
-      state.installSnapshot(request);
-
-      // update the committed index
-      // re-load the state machine if this is the last chunk
-      if (request.getDone()) {
-        state.reloadStateMachine(lastIncludedIndex, leaderTerm);
-      }
-      if (lifeCycle.getCurrentState() == RUNNING) {
-        heartbeatMonitor.updateLastRpcTime(false);
-      }
-    }
-    if (request.getDone()) {
-      LOG.info("{}: successfully install the whole snapshot-{}", getId(),
-          lastIncludedIndex);
-    }
-    return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(),
-        currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS);
-  }
-
-  AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm,
-      String targetId, TermIndex previous, List<LogEntryProto> entries,
-      boolean initializing) {
-    return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId,
-        leaderTerm, entries, state.getLog().getLastCommittedIndex(),
-        initializing, previous);
-  }
-
-  synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
-      String targetId, String requestId, int requestIndex, SnapshotInfo snapshot,
-      List<FileChunkProto> chunks, boolean done) {
-    OptionalLong totalSize = snapshot.getFiles().stream()
-        .mapToLong(FileInfo::getFileSize).reduce(Long::sum);
-    assert totalSize.isPresent();
-    return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId,
-        requestId, requestIndex, state.getCurrentTerm(), snapshot.getTermIndex(),
-        chunks, totalSize.getAsLong(), done);
-  }
-
-  synchronized RequestVoteRequestProto createRequestVoteRequest(String targetId,
-      long term, TermIndex lastEntry) {
-    return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, term,
-        lastEntry);
-  }
-
-  public synchronized void submitLocalSyncEvent() {
-    if (isLeader() && leaderState != null) {
-      leaderState.submitUpdateStateEvent(UPDATE_COMMIT_EVENT);
-    }
-  }
-
-  public void addPeersToRPC(Iterable<RaftPeer> peers) {
-    serverRpc.addPeers(peers);
-  }
-
-  synchronized void replyPendingRequest(long logIndex,
-      CompletableFuture<Message> message) {
-    if (isLeader() && leaderState != null) { // is leader and is running
-      leaderState.replyPendingRequest(logIndex, message);
-    }
-  }
-
-  TransactionContext getTransactionContext(long index) {
-    if (leaderState != null) { // is leader and is running
-      return leaderState.getTransactionContext(index);
-    }
-    return null;
-  }
-
-  public RaftProperties getProperties() {
-    return this.properties;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServerConfigKeys.java b/raft-server/src/main/java/org/apache/raft/server/RaftServerConfigKeys.java
index 837b53b..2ce0326 100644
--- a/raft-server/src/main/java/org/apache/raft/server/RaftServerConfigKeys.java
+++ b/raft-server/src/main/java/org/apache/raft/server/RaftServerConfigKeys.java
@@ -17,6 +17,7 @@
  */
 package org.apache.raft.server;
 
+import org.apache.raft.server.impl.LogAppenderFactory;
 import org.apache.raft.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RaftServerConstants.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServerConstants.java b/raft-server/src/main/java/org/apache/raft/server/RaftServerConstants.java
deleted file mode 100644
index f6781f3..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/RaftServerConstants.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import org.apache.raft.client.RaftClient;
-
-public interface RaftServerConstants {
-  long INVALID_LOG_INDEX = -1;
-  byte LOG_TERMINATE_BYTE = 0;
-  long DEFAULT_SEQNUM = RaftClient.DEFAULT_SEQNUM;
-
-  enum StartupOption {
-    FORMAT("format"),
-    REGULAR("regular");
-
-    private final String option;
-
-    StartupOption(String arg) {
-      this.option = arg;
-    }
-
-    public static StartupOption getOption(String arg) {
-      for (StartupOption s : StartupOption.values()) {
-        if (s.option.equals(arg)) {
-          return s;
-        }
-      }
-      return REGULAR;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java b/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java
deleted file mode 100644
index de81ec2..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-public interface RaftServerRpc {
-  void start();
-
-  void shutdown();
-
-  InetSocketAddress getInetSocketAddress();
-
-  AppendEntriesReplyProto sendAppendEntries(
-      AppendEntriesRequestProto request) throws IOException;
-
-  InstallSnapshotReplyProto sendInstallSnapshot(
-      InstallSnapshotRequestProto request) throws IOException;
-
-  RequestVoteReplyProto sendRequestVote(RequestVoteRequestProto request)
-      throws IOException;
-
-  /** add information of the given peers */
-  void addPeers(Iterable<RaftPeer> peers);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RequestDispatcher.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/RequestDispatcher.java b/raft-server/src/main/java/org/apache/raft/server/RequestDispatcher.java
deleted file mode 100644
index e281bfa..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/RequestDispatcher.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import org.apache.raft.protocol.*;
-import org.apache.raft.server.protocol.RaftServerProtocol;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.statemachine.TransactionContext;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
-/**
- * Each RPC request is first handled by the RequestDispatcher:
- * 1. A request from another RaftPeer is to be handled by RaftServer.
- *
- * If the raft peer is the leader, then:
- *
- * 2. A read-only request from client is to be handled by the state machine.
- * 3. A write request from client is first validated by the state machine. The
- * state machine returns the content of the raft log entry, which is then passed
- * to the RaftServer for replication.
- */
-public class RequestDispatcher implements RaftClientProtocol, RaftServerProtocol {
-  static final Logger LOG = LoggerFactory.getLogger(RequestDispatcher.class);
-
-  private final RaftServer server;
-  private final StateMachine stateMachine;
-
-  public RequestDispatcher(RaftServer server) {
-    this.server = server;
-    this.stateMachine = server.getStateMachine();
-  }
-
-  public CompletableFuture<RaftClientReply> handleClientRequest(
-      RaftClientRequest request) throws IOException {
-    // first check the server's leader state
-    CompletableFuture<RaftClientReply> reply = server.checkLeaderState(request);
-    if (reply != null) {
-      return reply;
-    }
-
-    // let the state machine handle read-only request from client
-    if (request.isReadOnly()) {
-      // TODO: We might not be the leader anymore by the time this completes. See the RAFT paper,
-      // section 8 (last part)
-      return stateMachine.query(request);
-    }
-
-    // TODO: this client request will not be added to pending requests
-    // until later which means that any failure in between will leave partial state in the
-    // state machine. We should call cancelTransaction() for failed requests
-    TransactionContext entry = stateMachine.startTransaction(request);
-    if (entry.getException().isPresent()) {
-      throw RaftUtils.asIOException(entry.getException().get());
-    }
-
-    return server.appendTransaction(request, entry);
-  }
-
-  @Override
-  public RaftClientReply submitClientRequest(RaftClientRequest request)
-      throws IOException {
-    return waitForReply(server.getId(), request, handleClientRequest(request));
-  }
-
-  public CompletableFuture<RaftClientReply> setConfigurationAsync(
-      SetConfigurationRequest request) throws IOException {
-    return server.setConfiguration(request);
-  }
-
-  @Override
-  public RaftClientReply setConfiguration(SetConfigurationRequest request)
-      throws IOException {
-    return waitForReply(server.getId(), request, setConfigurationAsync(request));
-  }
-
-  private static RaftClientReply waitForReply(String serverId,
-      RaftClientRequest request, CompletableFuture<RaftClientReply> future)
-      throws IOException {
-    try {
-      return future.get();
-    } catch (InterruptedException e) {
-      final String s = serverId + ": Interrupted when waiting for reply, request=" + request;
-      LOG.info(s, e);
-      throw RaftUtils.toInterruptedIOException(s, e);
-    } catch (ExecutionException e) {
-      final Throwable cause = e.getCause();
-      if (cause == null) {
-        throw new IOException(e);
-      }
-      if (cause instanceof NotLeaderException) {
-        return new RaftClientReply(request, (NotLeaderException)cause);
-      } else {
-        throw RaftUtils.asIOException(cause);
-      }
-    }
-  }
-
-  @Override
-  public RequestVoteReplyProto requestVote(RequestVoteRequestProto request)
-      throws IOException {
-    return server.requestVote(request);
-  }
-
-  @Override
-  public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request)
-      throws IOException {
-    return server.appendEntries(request);
-  }
-
-  @Override
-  public InstallSnapshotReplyProto installSnapshot(
-      InstallSnapshotRequestProto request) throws IOException {
-    return server.installSnapshot(request);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/Role.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/Role.java b/raft-server/src/main/java/org/apache/raft/server/Role.java
deleted file mode 100644
index a7e2f4c..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/Role.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-/**
- * Role of Raft peer
- */
-public enum Role {
-  LEADER, CANDIDATE, FOLLOWER
-}