You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/01/31 21:17:22 UTC

[37/54] [abbrv] incubator-ratis git commit: Renamed the packages from raft to ratis in preperation for Apache Incubation - Moved all java packages from org.apache.raft to org.apache.ratis. - Moved native package to org_apache_ratis, and native lib to l

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
deleted file mode 100644
index 4b8c442..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
+++ /dev/null
@@ -1,813 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.*;
-import org.apache.raft.server.RaftServer;
-import org.apache.raft.server.RaftServerConfigKeys;
-import org.apache.raft.server.RaftServerRpc;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.FileInfo;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.statemachine.TransactionContext;
-import org.apache.raft.util.CodeInjectionForTesting;
-import org.apache.raft.util.LifeCycle;
-import org.apache.raft.util.ProtoUtils;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.OptionalLong;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
-import static org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*;
-import static org.apache.raft.util.LifeCycle.State.*;
-
-public class RaftServerImpl implements RaftServer {
-  public static final Logger LOG = LoggerFactory.getLogger(RaftServerImpl.class);
-
-  private static final String CLASS_NAME = RaftServerImpl.class.getSimpleName();
-  static final String REQUEST_VOTE = CLASS_NAME + ".requestVote";
-  static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
-  static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot";
-
-
-  /** Role of raft peer */
-  enum Role {
-    LEADER, CANDIDATE, FOLLOWER
-  }
-
-  private final int minTimeoutMs;
-  private final int maxTimeoutMs;
-
-  private final LifeCycle lifeCycle;
-  private final ServerState state;
-  private final StateMachine stateMachine;
-  private final RaftProperties properties;
-  private volatile Role role;
-
-  /** used when the peer is follower, to monitor election timeout */
-  private volatile FollowerState heartbeatMonitor;
-
-  /** used when the peer is candidate, to request votes from other peers */
-  private volatile LeaderElection electionDaemon;
-
-  /** used when the peer is leader */
-  private volatile LeaderState leaderState;
-
-  private RaftServerRpc serverRpc;
-
-  private final LogAppenderFactory appenderFactory;
-
-  public RaftServerImpl(String id, RaftConfiguration raftConf,
-                        RaftProperties properties, StateMachine stateMachine) throws IOException {
-    this.lifeCycle = new LifeCycle(id);
-    minTimeoutMs = properties.getInt(
-        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY,
-        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT);
-    maxTimeoutMs = properties.getInt(
-        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY,
-        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT);
-    Preconditions.checkArgument(maxTimeoutMs > minTimeoutMs,
-        "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
-    this.properties = properties;
-    this.stateMachine = stateMachine;
-    this.state = new ServerState(id, raftConf, properties, this, stateMachine);
-    appenderFactory = initAppenderFactory();
-  }
-
-  int getMinTimeoutMs() {
-    return minTimeoutMs;
-  }
-
-  int getMaxTimeoutMs() {
-    return maxTimeoutMs;
-  }
-
-  int getRandomTimeoutMs() {
-    return RaftUtils.getRandomBetween(minTimeoutMs, maxTimeoutMs);
-  }
-
-  @Override
-  public StateMachine getStateMachine() {
-    return this.stateMachine;
-  }
-
-  public LogAppenderFactory getLogAppenderFactory() {
-    return appenderFactory;
-  }
-
-  private LogAppenderFactory initAppenderFactory() {
-    Class<? extends LogAppenderFactory> factoryClass = properties.getClass(
-        RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
-        RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT,
-        LogAppenderFactory.class);
-    return RaftUtils.newInstance(factoryClass);
-  }
-
-  /**
-   * Used by tests to set initial raft configuration with correct port bindings.
-   */
-  @VisibleForTesting
-  public void setInitialConf(RaftConfiguration conf) {
-    this.state.setInitialConf(conf);
-  }
-
-  @Override
-  public void setServerRpc(RaftServerRpc serverRpc) {
-    this.serverRpc = serverRpc;
-    // add peers into rpc service
-    RaftConfiguration conf = getRaftConf();
-    if (conf != null) {
-      serverRpc.addPeers(conf.getPeers());
-    }
-  }
-
-  public RaftServerRpc getServerRpc() {
-    return serverRpc;
-  }
-
-  @Override
-  public void start() {
-    lifeCycle.transition(STARTING);
-    state.start();
-    RaftConfiguration conf = getRaftConf();
-    if (conf != null && conf.contains(getId())) {
-      LOG.debug("{} starts as a follower", getId());
-      startAsFollower();
-    } else {
-      LOG.debug("{} starts with initializing state", getId());
-      startInitializing();
-    }
-  }
-
-  /**
-   * The peer belongs to the current configuration, should start as a follower
-   */
-  private void startAsFollower() {
-    role = Role.FOLLOWER;
-    heartbeatMonitor = new FollowerState(this);
-    heartbeatMonitor.start();
-
-    serverRpc.start();
-    lifeCycle.transition(RUNNING);
-  }
-
-  /**
-   * The peer does not have any configuration (maybe it will later be included
-   * in some configuration). Start still as a follower but will not vote or
-   * start election.
-   */
-  private void startInitializing() {
-    role = Role.FOLLOWER;
-    // do not start heartbeatMonitoring
-    serverRpc.start();
-  }
-
-  public ServerState getState() {
-    return this.state;
-  }
-
-  @Override
-  public String getId() {
-    return getState().getSelfId();
-  }
-
-  RaftConfiguration getRaftConf() {
-    return getState().getRaftConf();
-  }
-
-  @Override
-  public void close() {
-    lifeCycle.checkStateAndClose(() -> {
-      try {
-        shutdownHeartbeatMonitor();
-        shutdownElectionDaemon();
-        shutdownLeaderState();
-
-        serverRpc.close();
-        state.close();
-      } catch (Exception ignored) {
-        LOG.warn("Failed to kill " + state.getSelfId(), ignored);
-      }
-    });
-  }
-
-  @VisibleForTesting
-  public boolean isAlive() {
-    return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED);
-  }
-
-  public boolean isFollower() {
-    return role == Role.FOLLOWER;
-  }
-
-  public boolean isCandidate() {
-    return role == Role.CANDIDATE;
-  }
-
-  public boolean isLeader() {
-    return role == Role.LEADER;
-  }
-
-  /**
-   * Change the server state to Follower if necessary
-   * @param newTerm The new term.
-   * @param sync We will call {@link ServerState#persistMetadata()} if this is
-   *             set to true and term/votedFor get updated.
-   * @return if the term/votedFor should be updated to the new term
-   * @throws IOException if term/votedFor persistence failed.
-   */
-  synchronized boolean changeToFollower(long newTerm, boolean sync)
-      throws IOException {
-    final Role old = role;
-    role = Role.FOLLOWER;
-
-    boolean metadataUpdated = false;
-    if (newTerm > state.getCurrentTerm()) {
-      state.setCurrentTerm(newTerm);
-      state.resetLeaderAndVotedFor();
-      metadataUpdated = true;
-    }
-
-    if (old == Role.LEADER) {
-      assert leaderState != null;
-      shutdownLeaderState();
-    } else if (old == Role.CANDIDATE) {
-      shutdownElectionDaemon();
-    }
-
-    if (old != Role.FOLLOWER) {
-      heartbeatMonitor = new FollowerState(this);
-      heartbeatMonitor.start();
-    }
-
-    if (metadataUpdated && sync) {
-      state.persistMetadata();
-    }
-    return metadataUpdated;
-  }
-
-  private synchronized void shutdownLeaderState() {
-    final LeaderState leader = leaderState;
-    if (leader != null) {
-      leader.stop();
-    }
-    leaderState = null;
-    // TODO: make sure that StateMachineUpdater has applied all transactions that have context
-  }
-
-  private void shutdownElectionDaemon() {
-    final LeaderElection election = electionDaemon;
-    if (election != null) {
-      election.stopRunning();
-      // no need to interrupt the election thread
-    }
-    electionDaemon = null;
-  }
-
-  synchronized void changeToLeader() {
-    Preconditions.checkState(isCandidate());
-    shutdownElectionDaemon();
-    role = Role.LEADER;
-    state.becomeLeader();
-    // start sending AppendEntries RPC to followers
-    leaderState = new LeaderState(this, properties);
-    leaderState.start();
-  }
-
-  private void shutdownHeartbeatMonitor() {
-    final FollowerState hm = heartbeatMonitor;
-    if (hm != null) {
-      hm.stopRunning();
-      hm.interrupt();
-    }
-    heartbeatMonitor = null;
-  }
-
-  synchronized void changeToCandidate() {
-    Preconditions.checkState(isFollower());
-    shutdownHeartbeatMonitor();
-    role = Role.CANDIDATE;
-    // start election
-    electionDaemon = new LeaderElection(this);
-    electionDaemon.start();
-  }
-
-  @Override
-  public String toString() {
-    return role + " " + state + " " + lifeCycle.getCurrentState();
-  }
-
-  /**
-   * @return null if the server is in leader state.
-   */
-  private CompletableFuture<RaftClientReply> checkLeaderState(
-      RaftClientRequest request) {
-    if (!isLeader()) {
-      NotLeaderException exception = generateNotLeaderException();
-      CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
-      future.complete(new RaftClientReply(request, exception));
-      return future;
-    }
-    return null;
-  }
-
-  NotLeaderException generateNotLeaderException() {
-    if (lifeCycle.getCurrentState() != RUNNING) {
-      return new NotLeaderException(getId(), null, null);
-    }
-    String leaderId = state.getLeaderId();
-    if (leaderId == null || leaderId.equals(state.getSelfId())) {
-      // No idea about who is the current leader. Or the peer is the current
-      // leader, but it is about to step down
-      RaftPeer suggestedLeader = state.getRaftConf()
-          .getRandomPeer(state.getSelfId());
-      leaderId = suggestedLeader == null ? null : suggestedLeader.getId();
-    }
-    RaftConfiguration conf = getRaftConf();
-    Collection<RaftPeer> peers = conf.getPeers();
-    return new NotLeaderException(getId(), conf.getPeer(leaderId),
-        peers.toArray(new RaftPeer[peers.size()]));
-  }
-
-  /**
-   * Handle a normal update request from client.
-   */
-  private CompletableFuture<RaftClientReply> appendTransaction(
-      RaftClientRequest request, TransactionContext entry)
-      throws RaftException {
-    LOG.debug("{}: receive client request({})", getId(), request);
-    lifeCycle.assertCurrentState(RUNNING);
-    CompletableFuture<RaftClientReply> reply;
-
-    final PendingRequest pending;
-    synchronized (this) {
-      reply = checkLeaderState(request);
-      if (reply != null) {
-        return reply;
-      }
-
-      // append the message to its local log
-      final long entryIndex;
-      try {
-        entryIndex = state.applyLog(entry);
-      } catch (IOException e) {
-        throw new RaftException(e);
-      }
-
-      // put the request into the pending queue
-      pending = leaderState.addPendingRequest(entryIndex, request, entry);
-      leaderState.notifySenders();
-    }
-    return pending.getFuture();
-  }
-
-  @Override
-  public CompletableFuture<RaftClientReply> submitClientRequestAsync(
-      RaftClientRequest request) throws IOException {
-    // first check the server's leader state
-    CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
-    if (reply != null) {
-      return reply;
-    }
-
-    // let the state machine handle read-only request from client
-    if (request.isReadOnly()) {
-      // TODO: We might not be the leader anymore by the time this completes. See the RAFT paper,
-      // section 8 (last part)
-      return stateMachine.query(request);
-    }
-
-    // TODO: this client request will not be added to pending requests
-    // until later which means that any failure in between will leave partial state in the
-    // state machine. We should call cancelTransaction() for failed requests
-    TransactionContext entry = stateMachine.startTransaction(request);
-    if (entry.getException().isPresent()) {
-      throw RaftUtils.asIOException(entry.getException().get());
-    }
-
-    return appendTransaction(request, entry);
-  }
-
-  @Override
-  public RaftClientReply submitClientRequest(RaftClientRequest request)
-      throws IOException {
-    return waitForReply(getId(), request, submitClientRequestAsync(request));
-  }
-
-  private static RaftClientReply waitForReply(String id, RaftClientRequest request,
-      CompletableFuture<RaftClientReply> future) throws IOException {
-    try {
-      return future.get();
-    } catch (InterruptedException e) {
-      final String s = id + ": Interrupted when waiting for reply, request=" + request;
-      LOG.info(s, e);
-      throw RaftUtils.toInterruptedIOException(s, e);
-    } catch (ExecutionException e) {
-      final Throwable cause = e.getCause();
-      if (cause == null) {
-        throw new IOException(e);
-      }
-      if (cause instanceof NotLeaderException) {
-        return new RaftClientReply(request, (NotLeaderException)cause);
-      } else {
-        throw RaftUtils.asIOException(cause);
-      }
-    }
-  }
-
-  @Override
-  public RaftClientReply setConfiguration(SetConfigurationRequest request)
-      throws IOException {
-    return waitForReply(getId(), request, setConfigurationAsync(request));
-  }
-
-  /**
-   * Handle a raft configuration change request from client.
-   */
-  @Override
-  public CompletableFuture<RaftClientReply> setConfigurationAsync(
-      SetConfigurationRequest request) throws IOException {
-    LOG.debug("{}: receive setConfiguration({})", getId(), request);
-    lifeCycle.assertCurrentState(RUNNING);
-    CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
-    if (reply != null) {
-      return reply;
-    }
-
-    final RaftPeer[] peersInNewConf = request.getPeersInNewConf();
-    final PendingRequest pending;
-    synchronized (this) {
-      reply = checkLeaderState(request);
-      if (reply != null) {
-        return reply;
-      }
-
-      final RaftConfiguration current = getRaftConf();
-      // make sure there is no other raft reconfiguration in progress
-      if (!current.isStable() || leaderState.inStagingState() ||
-          !state.isCurrentConfCommitted()) {
-        throw new ReconfigurationInProgressException(
-            "Reconfiguration is already in progress: " + current);
-      }
-
-      // return true if the new configuration is the same with the current one
-      if (current.hasNoChange(peersInNewConf)) {
-        pending = leaderState.returnNoConfChange(request);
-        return pending.getFuture();
-      }
-
-      // add new peers into the rpc service
-      getServerRpc().addPeers(Arrays.asList(peersInNewConf));
-      // add staging state into the leaderState
-      pending = leaderState.startSetConfiguration(request);
-    }
-    return pending.getFuture();
-  }
-
-  private boolean shouldWithholdVotes() {
-    return isLeader() || (isFollower() && state.hasLeader()
-        && heartbeatMonitor.shouldWithholdVotes());
-  }
-
-  /**
-   * check if the remote peer is not included in the current conf
-   * and should shutdown. should shutdown if all the following stands:
-   * 1. this is a leader
-   * 2. current conf is stable and has been committed
-   * 3. candidate id is not included in conf
-   * 4. candidate's last entry's index < conf's index
-   */
-  private boolean shouldSendShutdown(String candidateId,
-      TermIndex candidateLastEntry) {
-    return isLeader()
-        && getRaftConf().isStable()
-        && getState().isConfCommitted()
-        && !getRaftConf().containsInConf(candidateId)
-        && candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex()
-        && !leaderState.isBootStrappingPeer(candidateId);
-  }
-
-  @Override
-  public RequestVoteReplyProto requestVote(RequestVoteRequestProto r)
-      throws IOException {
-    final String candidateId = r.getServerRequest().getRequestorId();
-    return requestVote(candidateId, r.getCandidateTerm(),
-        ServerProtoUtils.toTermIndex(r.getCandidateLastEntry()));
-  }
-
-  private RequestVoteReplyProto requestVote(String candidateId,
-      long candidateTerm, TermIndex candidateLastEntry) throws IOException {
-    CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
-        candidateId, candidateTerm, candidateLastEntry);
-    LOG.debug("{}: receive requestVote({}, {}, {})",
-        getId(), candidateId, candidateTerm, candidateLastEntry);
-    lifeCycle.assertCurrentState(RUNNING);
-
-    boolean voteGranted = false;
-    boolean shouldShutdown = false;
-    final RequestVoteReplyProto reply;
-    synchronized (this) {
-      if (shouldWithholdVotes()) {
-        LOG.info("{} Withhold vote from server {} with term {}. " +
-            "This server:{}, last rpc time from leader {} is {}", getId(),
-            candidateId, candidateTerm, this, this.getState().getLeaderId(),
-            (isFollower() ? heartbeatMonitor.getLastRpcTime() : -1));
-      } else if (state.recognizeCandidate(candidateId, candidateTerm)) {
-        boolean termUpdated = changeToFollower(candidateTerm, false);
-        // see Section 5.4.1 Election restriction
-        if (state.isLogUpToDate(candidateLastEntry)) {
-          heartbeatMonitor.updateLastRpcTime(false);
-          state.grantVote(candidateId);
-          voteGranted = true;
-        }
-        if (termUpdated || voteGranted) {
-          state.persistMetadata(); // sync metafile
-        }
-      }
-      if (!voteGranted && shouldSendShutdown(candidateId, candidateLastEntry)) {
-        shouldShutdown = true;
-      }
-      reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(),
-          voteGranted, state.getCurrentTerm(), shouldShutdown);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("{} replies to vote request: {}. Peer's state: {}",
-            getId(), ProtoUtils.toString(reply), state);
-      }
-    }
-    return reply;
-  }
-
-  private void validateEntries(long expectedTerm, TermIndex previous,
-      LogEntryProto... entries) {
-    if (entries != null && entries.length > 0) {
-      final long index0 = entries[0].getIndex();
-
-      if (previous == null || previous.getTerm() == 0) {
-        Preconditions.checkArgument(index0 == 0,
-            "Unexpected Index: previous is null but entries[%s].getIndex()=%s",
-            0, index0);
-      } else {
-        Preconditions.checkArgument(previous.getIndex() == index0 - 1,
-            "Unexpected Index: previous is %s but entries[%s].getIndex()=%s",
-            previous, 0, index0);
-      }
-
-      for (int i = 0; i < entries.length; i++) {
-        final long t = entries[i].getTerm();
-        Preconditions.checkArgument(expectedTerm >= t,
-            "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s",
-            i, t, expectedTerm);
-
-        final long indexi = entries[i].getIndex();
-        Preconditions.checkArgument(indexi == index0 + i,
-            "Unexpected Index: entries[%s].getIndex()=%s but entries[0].getIndex()=%s",
-            i, indexi, index0);
-      }
-    }
-  }
-
-  @Override
-  public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
-      throws IOException {
-    // TODO avoid converting list to array
-    final LogEntryProto[] entries = r.getEntriesList()
-        .toArray(new LogEntryProto[r.getEntriesCount()]);
-    final TermIndex previous = r.hasPreviousLog() ?
-        ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null;
-    return appendEntries(r.getServerRequest().getRequestorId(),
-        r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(),
-        entries);
-  }
-
-  private AppendEntriesReplyProto appendEntries(String leaderId, long leaderTerm,
-      TermIndex previous, long leaderCommit, boolean initializing,
-      LogEntryProto... entries) throws IOException {
-    CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
-        leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("{}: receive appendEntries({}, {}, {}, {}, {}, {})", getId(),
-          leaderId, leaderTerm, previous, leaderCommit, initializing,
-          ServerProtoUtils.toString(entries));
-    }
-    lifeCycle.assertCurrentState(STARTING, RUNNING);
-
-    try {
-      validateEntries(leaderTerm, previous, entries);
-    } catch (IllegalArgumentException e) {
-      throw new IOException(e);
-    }
-
-    final long currentTerm;
-    long nextIndex = state.getLog().getNextIndex();
-    synchronized (this) {
-      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
-      currentTerm = state.getCurrentTerm();
-      if (!recognized) {
-        final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
-            leaderId, getId(), currentTerm, nextIndex, NOT_LEADER);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("{}: do not recognize leader. Reply: {}",
-              getId(), ProtoUtils.toString(reply));
-        }
-        return reply;
-      }
-      changeToFollower(leaderTerm, true);
-      state.setLeader(leaderId);
-
-      if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) {
-        heartbeatMonitor = new FollowerState(this);
-        heartbeatMonitor.start();
-      }
-      if (lifeCycle.getCurrentState() == RUNNING) {
-        heartbeatMonitor.updateLastRpcTime(true);
-      }
-
-      // We need to check if "previous" is in the local peer. Note that it is
-      // possible that "previous" is covered by the latest snapshot: e.g.,
-      // it's possible there's no log entries outside of the latest snapshot.
-      // However, it is not possible that "previous" index is smaller than the
-      // last index included in snapshot. This is because indices <= snapshot's
-      // last index should have been committed.
-      if (previous != null && !containPrevious(previous)) {
-        final AppendEntriesReplyProto reply =
-            ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(),
-                currentTerm, Math.min(nextIndex, previous.getIndex()), INCONSISTENCY);
-        LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}",
-            getId(), previous, ServerProtoUtils.toString(reply));
-        return reply;
-      }
-
-      state.getLog().append(entries);
-      state.updateConfiguration(entries);
-      state.updateStatemachine(leaderCommit, currentTerm);
-    }
-    if (entries != null && entries.length > 0) {
-      try {
-        state.getLog().logSync();
-      } catch (InterruptedException e) {
-        throw new InterruptedIOException("logSync got interrupted");
-      }
-      nextIndex = entries[entries.length - 1].getIndex() + 1;
-    }
-    synchronized (this) {
-      if (lifeCycle.getCurrentState() == RUNNING && isFollower()
-          && getState().getCurrentTerm() == currentTerm) {
-        // reset election timer to avoid punishing the leader for our own
-        // long disk writes
-        heartbeatMonitor.updateLastRpcTime(false);
-      }
-    }
-    final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
-        leaderId, getId(), currentTerm, nextIndex, SUCCESS);
-    LOG.debug("{}: succeeded to handle AppendEntries. Reply: {}", getId(),
-        ServerProtoUtils.toString(reply));
-    return reply;
-  }
-
-  private boolean containPrevious(TermIndex previous) {
-    LOG.debug("{}: prev:{}, latestSnapshot:{}, getLatestInstalledSnapshot:{}",
-        getId(), previous, state.getLatestSnapshot(), state.getLatestInstalledSnapshot());
-    return state.getLog().contains(previous)
-        ||  (state.getLatestSnapshot() != null
-             && state.getLatestSnapshot().getTermIndex().equals(previous))
-        || (state.getLatestInstalledSnapshot() != null)
-             && state.getLatestInstalledSnapshot().equals(previous);
-  }
-
-  @Override
-  public InstallSnapshotReplyProto installSnapshot(
-      InstallSnapshotRequestProto request) throws IOException {
-    final String leaderId = request.getServerRequest().getRequestorId();
-    CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), leaderId, request);
-    LOG.debug("{}: receive installSnapshot({})", getId(), request);
-
-    lifeCycle.assertCurrentState(STARTING, RUNNING);
-
-    final long currentTerm;
-    final long leaderTerm = request.getLeaderTerm();
-    final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex(
-        request.getTermIndex());
-    final long lastIncludedIndex = lastTermIndex.getIndex();
-    synchronized (this) {
-      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
-      currentTerm = state.getCurrentTerm();
-      if (!recognized) {
-        final InstallSnapshotReplyProto reply = ServerProtoUtils
-            .toInstallSnapshotReplyProto(leaderId, getId(), currentTerm,
-                request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
-        LOG.debug("{}: do not recognize leader for installing snapshot." +
-            " Reply: {}", getId(), reply);
-        return reply;
-      }
-      changeToFollower(leaderTerm, true);
-      state.setLeader(leaderId);
-
-      if (lifeCycle.getCurrentState() == RUNNING) {
-        heartbeatMonitor.updateLastRpcTime(true);
-      }
-
-      // Check and append the snapshot chunk. We simply put this in lock
-      // considering a follower peer requiring a snapshot installation does not
-      // have a lot of requests
-      Preconditions.checkState(
-          state.getLog().getNextIndex() <= lastIncludedIndex,
-          "%s log's next id is %s, last included index in snapshot is %s",
-          getId(),  state.getLog().getNextIndex(), lastIncludedIndex);
-
-      //TODO: We should only update State with installed snapshot once the request is done.
-      state.installSnapshot(request);
-
-      // update the committed index
-      // re-load the state machine if this is the last chunk
-      if (request.getDone()) {
-        state.reloadStateMachine(lastIncludedIndex, leaderTerm);
-      }
-      if (lifeCycle.getCurrentState() == RUNNING) {
-        heartbeatMonitor.updateLastRpcTime(false);
-      }
-    }
-    if (request.getDone()) {
-      LOG.info("{}: successfully install the whole snapshot-{}", getId(),
-          lastIncludedIndex);
-    }
-    return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(),
-        currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS);
-  }
-
-  AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm,
-      String targetId, TermIndex previous, List<LogEntryProto> entries,
-      boolean initializing) {
-    return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId,
-        leaderTerm, entries, state.getLog().getLastCommittedIndex(),
-        initializing, previous);
-  }
-
-  synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
-      String targetId, String requestId, int requestIndex, SnapshotInfo snapshot,
-      List<FileChunkProto> chunks, boolean done) {
-    OptionalLong totalSize = snapshot.getFiles().stream()
-        .mapToLong(FileInfo::getFileSize).reduce(Long::sum);
-    assert totalSize.isPresent();
-    return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId,
-        requestId, requestIndex, state.getCurrentTerm(), snapshot.getTermIndex(),
-        chunks, totalSize.getAsLong(), done);
-  }
-
-  synchronized RequestVoteRequestProto createRequestVoteRequest(String targetId,
-      long term, TermIndex lastEntry) {
-    return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, term,
-        lastEntry);
-  }
-
-  public synchronized void submitLocalSyncEvent() {
-    if (isLeader() && leaderState != null) {
-      leaderState.submitUpdateStateEvent(LeaderState.UPDATE_COMMIT_EVENT);
-    }
-  }
-
-  synchronized void replyPendingRequest(long logIndex,
-      CompletableFuture<Message> message) {
-    if (isLeader() && leaderState != null) { // is leader and is running
-      leaderState.replyPendingRequest(logIndex, message);
-    }
-  }
-
-  TransactionContext getTransactionContext(long index) {
-    if (leaderState != null) { // is leader and is running
-      return leaderState.getTransactionContext(index);
-    }
-    return null;
-  }
-
-  public RaftProperties getProperties() {
-    return this.properties;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java b/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java
deleted file mode 100644
index e30b979..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import org.apache.raft.client.impl.ClientProtoUtils;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-import org.apache.raft.util.ProtoUtils;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.apache.raft.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
-import static org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
-
-/** Server proto utilities for internal use. */
-public class ServerProtoUtils {
-  public static TermIndex toTermIndex(TermIndexProto p) {
-    return p == null? null: TermIndex.newTermIndex(p.getTerm(), p.getIndex());
-  }
-
-  public static TermIndexProto toTermIndexProto(TermIndex ti) {
-    return ti == null? null: TermIndexProto.newBuilder()
-        .setTerm(ti.getTerm())
-        .setIndex(ti.getIndex())
-        .build();
-  }
-
-  public static TermIndex toTermIndex(LogEntryProto entry) {
-    return entry == null ? null :
-        TermIndex.newTermIndex(entry.getTerm(), entry.getIndex());
-  }
-
-  public static String toString(LogEntryProto... entries) {
-    return entries == null? "null"
-        : entries.length == 0 ? "[]"
-        : entries.length == 1? "" + toTermIndex(entries[0])
-        : "" + Arrays.stream(entries).map(ServerProtoUtils::toTermIndex)
-            .collect(Collectors.toList());
-  }
-
-  public static String toString(AppendEntriesReplyProto reply) {
-    return toString(reply.getServerReply()) + "," + reply.getResult()
-        + ",nextIndex:" + reply.getNextIndex() + ",term:" + reply.getTerm();
-  }
-
-  private static String toString(RaftRpcReplyProto reply) {
-    return reply.getRequestorId() + "->" + reply.getReplyId() + ","
-        + reply.getSuccess();
-  }
-
-  public static RaftConfigurationProto toRaftConfigurationProto(
-      RaftConfiguration conf) {
-    return RaftConfigurationProto.newBuilder()
-        .addAllPeers(ProtoUtils.toRaftPeerProtos(conf.getPeersInConf()))
-        .addAllOldPeers(ProtoUtils.toRaftPeerProtos(conf.getPeersInOldConf()))
-        .build();
-  }
-
-  public static RaftConfiguration toRaftConfiguration(
-      long index, RaftConfigurationProto proto) {
-    final RaftConfiguration.Builder b = RaftConfiguration.newBuilder()
-        .setConf(ProtoUtils.toRaftPeerArray(proto.getPeersList()))
-        .setLogEntryIndex(index);
-    if (proto.getOldPeersCount() > 0) {
-      b.setOldConf(ProtoUtils.toRaftPeerArray(proto.getOldPeersList()));
-    }
-    return b.build();
-  }
-
-  public static LogEntryProto toLogEntryProto(
-      RaftConfiguration conf, long term, long index) {
-    return LogEntryProto.newBuilder()
-        .setTerm(term)
-        .setIndex(index)
-        .setConfigurationEntry(toRaftConfigurationProto(conf))
-        .build();
-  }
-
-  public static RequestVoteReplyProto toRequestVoteReplyProto(
-      String requestorId, String replyId, boolean success, long term,
-      boolean shouldShutdown) {
-    final RequestVoteReplyProto.Builder b = RequestVoteReplyProto.newBuilder();
-    b.setServerReply(ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId, replyId,
-        DEFAULT_SEQNUM, success))
-        .setTerm(term)
-        .setShouldShutdown(shouldShutdown);
-    return b.build();
-  }
-
-  public static RequestVoteRequestProto toRequestVoteRequestProto(
-      String requestorId, String replyId, long term, TermIndex lastEntry) {
-    final RequestVoteRequestProto.Builder b = RequestVoteRequestProto.newBuilder()
-        .setServerRequest(
-            ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId, DEFAULT_SEQNUM))
-        .setCandidateTerm(term);
-    if (lastEntry != null) {
-      b.setCandidateLastEntry(toTermIndexProto(lastEntry));
-    }
-    return b.build();
-  }
-
-  public static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
-      String requestorId, String replyId, long term, int requestIndex,
-      InstallSnapshotResult result) {
-    final RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId,
-        replyId, DEFAULT_SEQNUM, result == InstallSnapshotResult.SUCCESS);
-    final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
-        .newBuilder().setServerReply(rb).setTerm(term).setResult(result)
-        .setRequestIndex(requestIndex);
-    return builder.build();
-  }
-
-  public static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
-      String requestorId, String replyId, String requestId, int requestIndex,
-      long term, TermIndex lastTermIndex, List<FileChunkProto> chunks,
-      long totalSize, boolean done) {
-    return InstallSnapshotRequestProto.newBuilder()
-        .setServerRequest(
-            ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId, DEFAULT_SEQNUM))
-        .setRequestId(requestId)
-        .setRequestIndex(requestIndex)
-        // .setRaftConfiguration()  TODO: save and pass RaftConfiguration
-        .setLeaderTerm(term)
-        .setTermIndex(toTermIndexProto(lastTermIndex))
-        .addAllFileChunks(chunks)
-        .setTotalSize(totalSize)
-        .setDone(done).build();
-  }
-
-  public static AppendEntriesReplyProto toAppendEntriesReplyProto(
-      String requestorId, String replyId, long term,
-      long nextIndex, AppendEntriesReplyProto.AppendResult appendResult) {
-    RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId,
-        replyId, DEFAULT_SEQNUM, appendResult == SUCCESS);
-    final AppendEntriesReplyProto.Builder b = AppendEntriesReplyProto.newBuilder();
-    b.setServerReply(rb).setTerm(term).setNextIndex(nextIndex)
-        .setResult(appendResult);
-    return b.build();
-  }
-
-  public static AppendEntriesRequestProto toAppendEntriesRequestProto(
-      String requestorId, String replyId, long leaderTerm,
-      List<LogEntryProto> entries, long leaderCommit, boolean initializing,
-      TermIndex previous) {
-    final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto
-        .newBuilder()
-        .setServerRequest(
-            ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId, DEFAULT_SEQNUM))
-        .setLeaderTerm(leaderTerm)
-        .setLeaderCommit(leaderCommit)
-        .setInitializing(initializing);
-    if (entries != null && !entries.isEmpty()) {
-      b.addAllEntries(entries);
-    }
-
-    if (previous != null) {
-      b.setPreviousLog(toTermIndexProto(previous));
-    }
-    return b.build();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java b/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java
deleted file mode 100644
index 8611101..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.*;
-import org.apache.raft.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.statemachine.TransactionContext;
-import org.apache.raft.util.ProtoUtils;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_DEFAULT;
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_KEY;
-
-/**
- * Common states of a raft peer. Protected by RaftServer's lock.
- */
-public class ServerState implements Closeable {
-  private final String selfId;
-  private final RaftServerImpl server;
-  /** Raft log */
-  private final RaftLog log;
-  /** Raft configuration */
-  private final ConfigurationManager configurationManager;
-  /** The thread that applies committed log entries to the state machine */
-  private final StateMachineUpdater stateMachineUpdater;
-  /** local storage for log and snapshot */
-  private final RaftStorage storage;
-  private final SnapshotManager snapshotManager;
-
-  /**
-   * Latest term server has seen. initialized to 0 on first boot, increases
-   * monotonically.
-   */
-  private long currentTerm;
-  /**
-   * The server ID of the leader for this term. Null means either there is
-   * no leader for this term yet or this server does not know who it is yet.
-   */
-  private String leaderId;
-  /**
-   * Candidate that this peer granted vote for in current term (or null if none)
-   */
-  private String votedFor;
-
-  /**
-   * Latest installed snapshot for this server. This maybe different than StateMachine's latest
-   * snapshot. Once we successfully install a snapshot, the SM may not pick it up immediately.
-   * Further, this will not get updated when SM does snapshots itself.
-   */
-  private TermIndex latestInstalledSnapshot;
-
-  ServerState(String id, RaftConfiguration conf, RaftProperties prop,
-              RaftServerImpl server, StateMachine stateMachine) throws IOException {
-    this.selfId = id;
-    this.server = server;
-    configurationManager = new ConfigurationManager(conf);
-    storage = new RaftStorage(prop, RaftServerConstants.StartupOption.REGULAR);
-    snapshotManager = new SnapshotManager(storage, id);
-
-    long lastApplied = initStatemachine(stateMachine, prop);
-
-    leaderId = null;
-    log = initLog(id, prop, server, lastApplied);
-    RaftLog.Metadata metadata = log.loadMetadata();
-    currentTerm = metadata.getTerm();
-    votedFor = metadata.getVotedFor();
-
-    stateMachineUpdater = new StateMachineUpdater(stateMachine, server, log,
-         lastApplied, prop);
-  }
-
-  /**
-   * Used by tests to set initial raft configuration with correct port bindings.
-   */
-  @VisibleForTesting
-  public void setInitialConf(RaftConfiguration initialConf) {
-    configurationManager.setInitialConf(initialConf);
-  }
-
-  private long initStatemachine(StateMachine sm, RaftProperties properties)
-      throws IOException {
-    sm.initialize(selfId, properties, storage);
-    storage.setStateMachineStorage(sm.getStateMachineStorage());
-    SnapshotInfo snapshot = sm.getLatestSnapshot();
-
-    if (snapshot == null || snapshot.getTermIndex().getIndex() < 0) {
-      return RaftServerConstants.INVALID_LOG_INDEX;
-    }
-
-    // get the raft configuration from the snapshot
-    RaftConfiguration raftConf = sm.getRaftConfiguration();
-    if (raftConf != null) {
-      configurationManager.addConfiguration(raftConf.getLogEntryIndex(),
-          raftConf);
-    }
-    return snapshot.getIndex();
-  }
-
-  void start() {
-    stateMachineUpdater.start();
-  }
-
-  /**
-   * note we do not apply log entries to the state machine here since we do not
-   * know whether they have been committed.
-   */
-  private RaftLog initLog(String id, RaftProperties prop, RaftServerImpl server,
-      long lastIndexInSnapshot) throws IOException {
-    final RaftLog log;
-    if (prop.getBoolean(RAFT_SERVER_USE_MEMORY_LOG_KEY,
-        RAFT_SERVER_USE_MEMORY_LOG_DEFAULT)) {
-      log = new MemoryRaftLog(id);
-    } else {
-      log = new SegmentedRaftLog(id, server, this.storage,
-          lastIndexInSnapshot, prop);
-    }
-    log.open(configurationManager, lastIndexInSnapshot);
-    return log;
-  }
-
-  public RaftConfiguration getRaftConf() {
-    return configurationManager.getCurrent();
-  }
-
-  @VisibleForTesting
-
-  public String getSelfId() {
-    return this.selfId;
-  }
-
-  public long getCurrentTerm() {
-    return currentTerm;
-  }
-
-  void setCurrentTerm(long term) {
-    currentTerm = term;
-  }
-
-  String getLeaderId() {
-    return leaderId;
-  }
-
-  boolean hasLeader() {
-    return leaderId != null;
-  }
-
-  /**
-   * Become a candidate and start leader election
-   */
-  long initElection() {
-    votedFor = selfId;
-    leaderId = null;
-    return ++currentTerm;
-  }
-
-  void persistMetadata() throws IOException {
-    this.log.writeMetadata(currentTerm, votedFor);
-  }
-
-  void resetLeaderAndVotedFor() {
-    votedFor = null;
-    leaderId = null;
-  }
-
-  /**
-   * Vote for a candidate and update the local state.
-   */
-  void grantVote(String candidateId) {
-    votedFor = candidateId;
-    leaderId = null;
-  }
-
-  void setLeader(String leaderId) {
-    this.leaderId = leaderId;
-  }
-
-  void becomeLeader() {
-    leaderId = selfId;
-  }
-
-  public RaftLog getLog() {
-    return log;
-  }
-
-  long applyLog(TransactionContext operation) throws IOException {
-    return log.append(currentTerm, operation);
-  }
-
-  /**
-   * Check if accept the leader selfId and term from the incoming AppendEntries rpc.
-   * If accept, update the current state.
-   * @return true if the check passes
-   */
-  boolean recognizeLeader(String leaderId, long leaderTerm) {
-    if (leaderTerm < currentTerm) {
-      return false;
-    } else if (leaderTerm > currentTerm || this.leaderId == null) {
-      // If the request indicates a term that is greater than the current term
-      // or no leader has been set for the current term, make sure to update
-      // leader and term later
-      return true;
-    }
-    Preconditions.checkArgument(this.leaderId.equals(leaderId),
-        "selfId:%s, this.leaderId:%s, received leaderId:%s",
-        selfId, this.leaderId, leaderId);
-    return true;
-  }
-
-  /**
-   * Check if the candidate's term is acceptable
-   */
-  boolean recognizeCandidate(String candidateId,
-      long candidateTerm) {
-    if (candidateTerm > currentTerm) {
-      return true;
-    } else if (candidateTerm == currentTerm) {
-      // has not voted yet or this is a retry
-      return votedFor == null || votedFor.equals(candidateId);
-    }
-    return false;
-  }
-
-  boolean isLogUpToDate(TermIndex candidateLastEntry) {
-    LogEntryProto lastEntry = log.getLastEntry();
-    // need to take into account snapshot
-    SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
-     if (lastEntry == null && snapshot == null) {
-      return true;
-    } else if (candidateLastEntry == null) {
-      return false;
-    }
-    TermIndex local = ServerProtoUtils.toTermIndex(lastEntry);
-    if (local == null || (snapshot != null && snapshot.getIndex() > lastEntry.getIndex())) {
-      local = snapshot.getTermIndex();
-    }
-    return local.compareTo(candidateLastEntry) <= 0;
-  }
-
-  @Override
-  public String toString() {
-    return selfId + ":t" + currentTerm + ", leader=" + leaderId
-        + ", voted=" + votedFor + ", raftlog=" + log + ", conf=" + getRaftConf();
-  }
-
-  boolean isConfCommitted() {
-    return getLog().getLastCommittedIndex() >=
-        getRaftConf().getLogEntryIndex();
-  }
-
-  public void setRaftConf(long logIndex, RaftConfiguration conf) {
-    configurationManager.addConfiguration(logIndex, conf);
-    RaftServerImpl.LOG.info("{}: successfully update the configuration {}",
-        getSelfId(), conf);
-  }
-
-  void updateConfiguration(LogEntryProto[] entries) {
-    if (entries != null && entries.length > 0) {
-      configurationManager.removeConfigurations(entries[0].getIndex());
-      for (LogEntryProto entry : entries) {
-        if (ProtoUtils.isConfigurationLogEntry(entry)) {
-          final RaftConfiguration conf = ServerProtoUtils.toRaftConfiguration(
-              entry.getIndex(), entry.getConfigurationEntry());
-          configurationManager.addConfiguration(entry.getIndex(), conf);
-          server.getServerRpc().addPeers(conf.getPeers());
-        }
-      }
-    }
-  }
-
-  void updateStatemachine(long majorityIndex, long currentTerm) {
-    log.updateLastCommitted(majorityIndex, currentTerm);
-    stateMachineUpdater.notifyUpdater();
-  }
-
-  void reloadStateMachine(long lastIndexInSnapshot, long currentTerm)
-      throws IOException {
-    log.updateLastCommitted(lastIndexInSnapshot, currentTerm);
-
-    stateMachineUpdater.reloadStateMachine();
-  }
-
-  @Override
-  public void close() throws IOException {
-    stateMachineUpdater.stop();
-    RaftServerImpl.LOG.info("{} closes. The last applied log index is {}",
-        getSelfId(), getLastAppliedIndex());
-    storage.close();
-  }
-
-  @VisibleForTesting
-  public RaftStorage getStorage() {
-    return storage;
-  }
-
-  void installSnapshot(InstallSnapshotRequestProto request) throws IOException {
-    // TODO: verify that we need to install the snapshot
-    StateMachine sm = server.getStateMachine();
-    sm.pause(); // pause the SM to prepare for install snapshot
-    snapshotManager.installSnapshot(sm, request);
-    log.syncWithSnapshot(request.getTermIndex().getIndex());
-    this.latestInstalledSnapshot = ServerProtoUtils.toTermIndex(
-        request.getTermIndex());
-  }
-
-  SnapshotInfo getLatestSnapshot() {
-    return server.getStateMachine().getStateMachineStorage().getLatestSnapshot();
-  }
-
-  public TermIndex getLatestInstalledSnapshot() {
-    return latestInstalledSnapshot;
-  }
-
-  @VisibleForTesting
-  public long getLastAppliedIndex() {
-    return stateMachineUpdater.getLastAppliedIndex();
-  }
-
-  boolean isCurrentConfCommitted() {
-    return getRaftConf().getLogEntryIndex() <= getLog().getLastCommittedIndex();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/ServerUtils.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/ServerUtils.java b/raft-server/src/main/java/org/apache/raft/server/impl/ServerUtils.java
deleted file mode 100644
index ac21386..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/impl/ServerUtils.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.raft.server.protocol.TermIndex;
-
-/** Server utilities for internal use. */
-public class ServerUtils {
-  public static TermIndex newTermIndex(long term, long index) {
-    return new TermIndexImpl(term, index);
-  }
-
-  private static class TermIndexImpl implements TermIndex {
-    private final long term;
-    private final long index; //log index; first index is 1.
-
-    TermIndexImpl(long term, long logIndex) {
-      this.term = term;
-      this.index = logIndex;
-    }
-
-    @Override
-    public long getTerm() {
-      return term;
-    }
-
-    @Override
-    public long getIndex() {
-      return index;
-    }
-
-    @Override
-    public int compareTo(TermIndex that) {
-      final int d = Long.compare(this.getTerm(), that.getTerm());
-      return d != 0 ? d : Long.compare(this.getIndex(), that.getIndex());
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      } else if (obj == null || !(obj instanceof TermIndexImpl)) {
-        return false;
-      }
-
-      final TermIndexImpl that = (TermIndexImpl) obj;
-      return this.getTerm() == that.getTerm()
-          && this.getIndex() == that.getIndex();
-    }
-
-    @Override
-    public int hashCode() {
-      return new HashCodeBuilder().append(term).append(index).hashCode();
-    }
-
-    private static String toString(long n) {
-      return n < 0 ? "~" : "" + n;
-    }
-
-    @Override
-    public String toString() {
-      return "(t:" + toString(term) + ", i:" + toString(index) + ")";
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java b/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java
deleted file mode 100644
index f85639b..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.Message;
-import org.apache.raft.server.RaftServerConfigKeys;
-import org.apache.raft.server.storage.RaftLog;
-import org.apache.raft.server.storage.RaftStorage;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.statemachine.TransactionContext;
-import org.apache.raft.util.Daemon;
-import org.apache.raft.util.ExitUtils;
-import org.apache.raft.util.LifeCycle;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
-import static org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY;
-
-/**
- * This class tracks the log entries that have been committed in a quorum and
- * applies them to the state machine. We let a separate thread do this work
- * asynchronously so that this will not block normal raft protocol.
- *
- * If the auto log compaction is enabled, the state machine updater thread will
- * trigger a snapshot of the state machine by calling
- * {@link StateMachine#takeSnapshot} when the log size exceeds a limit.
- */
-class StateMachineUpdater implements Runnable {
-  static final Logger LOG = LoggerFactory.getLogger(StateMachineUpdater.class);
-
-  enum State {
-    RUNNING, STOP, RELOAD
-  }
-
-  private final RaftProperties properties;
-  private final StateMachine stateMachine;
-  private final RaftServerImpl server;
-  private final RaftLog raftLog;
-
-  private volatile long lastAppliedIndex;
-
-  private final boolean autoSnapshotEnabled;
-  private final long snapshotThreshold;
-  private long lastSnapshotIndex;
-
-  private final Thread updater;
-  private volatile State state = State.RUNNING;
-
-  StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
-      RaftLog raftLog, long lastAppliedIndex, RaftProperties properties) {
-    this.properties = properties;
-    this.stateMachine = stateMachine;
-    this.server = server;
-    this.raftLog = raftLog;
-
-    this.lastAppliedIndex = lastAppliedIndex;
-    lastSnapshotIndex = lastAppliedIndex;
-
-    autoSnapshotEnabled = properties.getBoolean(
-        RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY,
-        RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_DEFAULT);
-    snapshotThreshold = properties.getLong(
-        RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY,
-        RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_DEFAULT);
-    updater = new Daemon(this);
-  }
-
-  void start() {
-    updater.start();
-  }
-
-  void stop() {
-    state = State.STOP;
-    updater.interrupt();
-    try {
-      stateMachine.close();
-    } catch (IOException ignored) {
-    }
-  }
-
-  void reloadStateMachine() {
-    state = State.RELOAD;
-    notifyUpdater();
-  }
-
-  synchronized void notifyUpdater() {
-    notifyAll();
-  }
-
-  @Override
-  public String toString() {
-    return this.getClass().getSimpleName() + "-" + raftLog.getSelfId();
-  }
-
-  @Override
-  public void run() {
-    final RaftStorage storage = server.getState().getStorage();
-    while (isRunning()) {
-      try {
-        synchronized (this) {
-          // when the peers just start, the committedIndex is initialized as 0
-          // and will be updated only after the leader contacts other peers.
-          // Thus initially lastAppliedIndex can be greater than lastCommitted.
-          while (lastAppliedIndex >= raftLog.getLastCommittedIndex()) {
-            wait();
-          }
-        }
-
-        final long committedIndex = raftLog.getLastCommittedIndex();
-        Preconditions.checkState(lastAppliedIndex < committedIndex);
-
-        if (state == State.RELOAD) {
-          Preconditions.checkState(stateMachine.getLifeCycleState() == LifeCycle.State.PAUSED);
-
-          stateMachine.reinitialize(server.getId(), properties, storage);
-
-          SnapshotInfo snapshot = stateMachine.getLatestSnapshot();
-          Preconditions.checkState(snapshot != null && snapshot.getIndex() > lastAppliedIndex,
-              "Snapshot: %s, lastAppliedIndex: %s", snapshot, lastAppliedIndex);
-
-          lastAppliedIndex = snapshot.getIndex();
-          lastSnapshotIndex = snapshot.getIndex();
-          state = State.RUNNING;
-        }
-
-        while (lastAppliedIndex < committedIndex) {
-          final LogEntryProto next = raftLog.get(lastAppliedIndex + 1);
-          if (next != null) {
-            if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
-              // the reply should have already been set. only need to record
-              // the new conf in the state machine.
-              stateMachine.setRaftConfiguration(
-                  ServerProtoUtils.toRaftConfiguration(next.getIndex(),
-                      next.getConfigurationEntry()));
-            } else if (next.getLogEntryBodyCase() == SMLOGENTRY) {
-              // check whether there is a TransactionContext because we are the leader.
-              TransactionContext trx = server.getTransactionContext(next.getIndex());
-              if (trx == null) {
-                trx = new TransactionContext(stateMachine, next);
-              }
-
-              // Let the StateMachine inject logic for committed transactions in sequential order.
-              trx = stateMachine.applyTransactionSerial(trx);
-
-              // TODO: This step can be parallelized
-              CompletableFuture<Message> messageFuture =
-                  stateMachine.applyTransaction(trx);
-              server.replyPendingRequest(next.getIndex(), messageFuture);
-            }
-            lastAppliedIndex++;
-          } else {
-            LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}",
-                this, lastAppliedIndex + 1, state);
-            break;
-          }
-        }
-
-        // check if need to trigger a snapshot
-        if (shouldTakeSnapshot(lastAppliedIndex)) {
-          stateMachine.takeSnapshot();
-          // TODO purge logs, including log cache. but should keep log for leader's RPCSenders
-          lastSnapshotIndex = lastAppliedIndex;
-        }
-      } catch (InterruptedException e) {
-        if (!isRunning()) {
-          LOG.info("{}: the StateMachineUpdater is interrupted and will exit.", this);
-        } else {
-          final String s = this + ": the StateMachineUpdater is wrongly interrupted";
-          ExitUtils.terminate(1, s, e, LOG);
-        }
-      } catch (Throwable t) {
-        final String s = this + ": the StateMachineUpdater hits Throwable";
-        ExitUtils.terminate(2, s, t, LOG);
-      }
-    }
-  }
-
-  private boolean isRunning() {
-    return state != State.STOP;
-  }
-
-  private boolean shouldTakeSnapshot(long currentAppliedIndex) {
-    return autoSnapshotEnabled && (state != State.RELOAD) &&
-        (currentAppliedIndex - lastSnapshotIndex >= snapshotThreshold);
-  }
-
-  long getLastAppliedIndex() {
-    return lastAppliedIndex;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/protocol/RaftServerProtocol.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/protocol/RaftServerProtocol.java b/raft-server/src/main/java/org/apache/raft/server/protocol/RaftServerProtocol.java
deleted file mode 100644
index 59e9bba..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/protocol/RaftServerProtocol.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.protocol;
-
-import org.apache.raft.shaded.proto.RaftProtos.*;
-
-import java.io.IOException;
-
-public interface RaftServerProtocol {
-
-  RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException;
-
-  AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException;
-
-  InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/protocol/TermIndex.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/protocol/TermIndex.java b/raft-server/src/main/java/org/apache/raft/server/protocol/TermIndex.java
deleted file mode 100644
index df401d6..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/protocol/TermIndex.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.protocol;
-
-import org.apache.raft.server.impl.ServerUtils;
-
-/** The term and the log index defined in the Raft consensus algorithm. */
-public interface TermIndex extends Comparable<TermIndex> {
-  /** @return the term. */
-  long getTerm();
-
-  /** @return the index. */
-  long getIndex();
-
-  /** Create a new {@link TermIndex} instance. */
-  static TermIndex newTermIndex(long term, long index) {
-    return ServerUtils.newTermIndex(term, index);
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/BufferedChannelBase.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/BufferedChannelBase.java b/raft-server/src/main/java/org/apache/raft/server/storage/BufferedChannelBase.java
deleted file mode 100644
index 4440be9..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/BufferedChannelBase.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-
-public abstract class BufferedChannelBase implements Closeable {
-  protected final FileChannel fileChannel;
-
-  protected BufferedChannelBase(FileChannel fc) {
-    this.fileChannel = fc;
-  }
-
-  protected FileChannel validateAndGetFileChannel() throws IOException {
-    if (!fileChannel.isOpen()) {
-      throw new IOException(
-          "Attempting to access a file channel that has already been closed");
-    }
-    return fileChannel;
-  }
-
-  /**
-   * Get the current size of the underlying FileChannel.
-   */
-  public long size() throws IOException {
-    return validateAndGetFileChannel().size();
-  }
-
-  /**
-   * Get the {@link FileChannel} that this BufferedChannel wraps around.
-   */
-  public FileChannel getFileChannel() {
-    return fileChannel;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/BufferedWriteChannel.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/BufferedWriteChannel.java b/raft-server/src/main/java/org/apache/raft/server/storage/BufferedWriteChannel.java
deleted file mode 100644
index 6c662d1..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/BufferedWriteChannel.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Provides a buffering layer in front of a FileChannel for writing.
- */
-public class BufferedWriteChannel extends BufferedChannelBase {
-  // The capacity of the write buffer.
-  private final int writeCapacity;
-  // The position of the file channel's write pointer.
-  private AtomicLong writeBufferStartPosition = new AtomicLong(0);
-  // The buffer used to write operations.
-  private final ByteBuffer writeBuffer;
-  // The absolute position of the next write operation.
-  private volatile long position;
-
-  public BufferedWriteChannel(FileChannel fc, int writeCapacity)
-      throws IOException {
-    super(fc);
-    this.writeCapacity = writeCapacity;
-    this.position = fc.position();
-    this.writeBufferStartPosition.set(position);
-    this.writeBuffer = ByteBuffer.allocateDirect(writeCapacity);
-  }
-
-  /**
-   * Write all the data in src to the {@link FileChannel}. Note that this function can
-   * buffer or re-order writes based on the implementation. These writes will be flushed
-   * to the disk only when flush() is invoked.
-   *
-   * @param src The source ByteBuffer which contains the data to be written.
-   * @throws IOException if a write operation fails.
-   */
-  public void write(ByteBuffer src) throws IOException {
-    int copied = 0;
-    while (src.remaining() > 0) {
-      int truncated = 0;
-      if (writeBuffer.remaining() < src.remaining()) {
-        truncated = src.remaining() - writeBuffer.remaining();
-        src.limit(src.limit() - truncated);
-      }
-      copied += src.remaining();
-      writeBuffer.put(src);
-      src.limit(src.limit() + truncated);
-      // if we have run out of buffer space, we should flush to the file
-      if (writeBuffer.remaining() == 0) {
-        flushInternal();
-      }
-    }
-    position += copied;
-  }
-
-  /**
-   * Write the specified byte.
-   * @param b the byte to be written
-   */
-  public void write(int b) throws IOException {
-    writeBuffer.put((byte) b);
-    if (writeBuffer.remaining() == 0) {
-      flushInternal();
-    }
-    position++;
-  }
-
-  public void write(byte[] b) throws IOException {
-    int offset = 0;
-    while (offset < b.length) {
-      int toPut = Math.min(b.length - offset, writeBuffer.remaining());
-      writeBuffer.put(b, offset, toPut);
-      offset += toPut;
-      if (writeBuffer.remaining() == 0) {
-        flushInternal();
-      }
-    }
-    position += b.length;
-  }
-
-  /**
-   * Get the position where the next write operation will begin writing from.
-   */
-  public long position() {
-    return position;
-  }
-
-  /**
-   * Get the position of the file channel's write pointer.
-   */
-  public long getFileChannelPosition() {
-    return writeBufferStartPosition.get();
-  }
-
-
-  /**
-   * Write any data in the buffer to the file. If sync is set to true, force a
-   * sync operation so that data is persisted to the disk.
-   *
-   * @throws IOException if the write or sync operation fails.
-   */
-  public void flush(boolean shouldForceWrite) throws IOException {
-    synchronized (this) {
-      flushInternal();
-    }
-    if (shouldForceWrite) {
-      forceWrite(false);
-    }
-  }
-
-  /**
-   * Write any data in the buffer to the file and advance the writeBufferPosition
-   * Callers are expected to synchronize appropriately
-   *
-   * @throws IOException if the write fails.
-   */
-  private void flushInternal() throws IOException {
-    writeBuffer.flip();
-    do {
-      fileChannel.write(writeBuffer);
-    } while (writeBuffer.hasRemaining());
-    writeBuffer.clear();
-    writeBufferStartPosition.set(fileChannel.position());
-  }
-
-  public long forceWrite(boolean forceMetadata) throws IOException {
-    // This is the point up to which we had flushed to the file system page cache
-    // before issuing this force write hence is guaranteed to be made durable by
-    // the force write, any flush that happens after this may or may
-    // not be flushed
-    long positionForceWrite = writeBufferStartPosition.get();
-    fileChannel.force(forceMetadata);
-    return positionForceWrite;
-  }
-
-  @Override
-  public void close() throws IOException {
-    fileChannel.close();
-    writeBuffer.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/FileInfo.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/FileInfo.java b/raft-server/src/main/java/org/apache/raft/server/storage/FileInfo.java
deleted file mode 100644
index cdeb622..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/FileInfo.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import org.apache.raft.io.MD5Hash;
-
-import java.nio.file.Path;
-
-/**
- * Metadata about a file.
- *
- * The objects of this class are immutable.
- */
-public class FileInfo {
-  private final Path path;
-  private final MD5Hash fileDigest;
-  private final long fileSize;
-
-  public FileInfo(Path path, MD5Hash fileDigest) {
-    this.path = path;
-    this.fileDigest = fileDigest;
-    this.fileSize = path.toFile().length();
-  }
-
-  @Override
-  public String toString() {
-    return path.toString();
-  }
-
-  /** @return the path of the file. */
-  public Path getPath() {
-    return path;
-  }
-
-  /** @return the MD5 file digest of the file. */
-  public MD5Hash getFileDigest() {
-    return fileDigest;
-  }
-
-  /** @return the size of the file. */
-  public long getFileSize() {
-    return fileSize;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java b/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java
deleted file mode 100644
index 95597b2..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-
-import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
-
-public class LogInputStream implements Closeable {
-  static final Logger LOG = LoggerFactory.getLogger(LogInputStream.class);
-
-  static class LogValidation {
-    private final long validLength;
-    private final long endIndex;
-    private final boolean hasCorruptHeader;
-
-    LogValidation(long validLength, long endIndex, boolean hasCorruptHeader) {
-      this.validLength = validLength;
-      this.endIndex = endIndex;
-      this.hasCorruptHeader = hasCorruptHeader;
-    }
-
-    long getValidLength() {
-      return validLength;
-    }
-
-    long getEndIndex() {
-      return endIndex;
-    }
-
-    boolean hasCorruptHeader() {
-      return hasCorruptHeader;
-    }
-  }
-
-  private enum State {
-    UNINIT,
-    OPEN,
-    CLOSED
-  }
-
-  private final File logFile;
-  private final long startIndex;
-  private final long endIndex;
-  private final boolean isOpen;
-  private State state = State.UNINIT;
-  private LogReader reader;
-
-  public LogInputStream(File log, long startIndex, long endIndex,
-      boolean isOpen) {
-    if (isOpen) {
-      Preconditions.checkArgument(endIndex == INVALID_LOG_INDEX);
-    } else {
-      Preconditions.checkArgument(endIndex >= startIndex);
-    }
-
-    this.logFile = log;
-    this.startIndex = startIndex;
-    this.endIndex = endIndex;
-    this.isOpen = isOpen;
-  }
-
-  private void init() throws IOException {
-    Preconditions.checkState(state == State.UNINIT);
-    try {
-      reader = new LogReader(logFile);
-      // read the log header
-      String header = reader.readLogHeader();
-      Preconditions.checkState(SegmentedRaftLog.HEADER_STR.equals(header),
-          "Corrupted log header: %s", header);
-      state = State.OPEN;
-    } finally {
-      if (reader == null) {
-        state = State.CLOSED;
-      }
-    }
-  }
-
-  long getStartIndex() {
-    return startIndex;
-  }
-
-  long getEndIndex() {
-    return endIndex;
-  }
-
-  String getName() {
-    return logFile.getName();
-  }
-
-  public LogEntryProto nextEntry() throws IOException {
-    LogEntryProto entry = null;
-    switch (state) {
-      case UNINIT:
-        try {
-          init();
-        } catch (Throwable e) {
-          LOG.error("caught exception initializing " + this, e);
-          Throwables.propagateIfPossible(e, IOException.class);
-        }
-        Preconditions.checkState(state != State.UNINIT);
-        return nextEntry();
-      case OPEN:
-        entry = reader.readEntry();
-        if (entry != null) {
-          long index = entry.getIndex();
-          if (!isOpen() && index >= endIndex) {
-            /**
-             * The end index may be derived from the segment recovery
-             * process. It is possible that we still have some uncleaned garbage
-             * in the end. We should skip them.
-             */
-            long skipAmt = logFile.length() - reader.getPos();
-            if (skipAmt > 0) {
-              LOG.debug("skipping {} bytes at the end of log '{}': reached" +
-                  " entry {} out of {}", skipAmt, getName(), index, endIndex);
-              reader.skipFully(skipAmt);
-            }
-          }
-        }
-        break;
-      case CLOSED:
-        break; // return null
-    }
-    return entry;
-  }
-
-  long scanNextEntry() throws IOException {
-    Preconditions.checkState(state == State.OPEN);
-    return reader.scanEntry();
-  }
-
-  long getPosition() {
-    if (state == State.OPEN) {
-      return reader.getPos();
-    } else {
-      return 0;
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (state == State.OPEN) {
-      reader.close();
-    }
-    state = State.CLOSED;
-  }
-
-  boolean isOpen() {
-    return isOpen;
-  }
-
-  @Override
-  public String toString() {
-    return getName();
-  }
-
-  /**
-   * @param file          File being scanned and validated.
-   * @param maxTxIdToScan Maximum Tx ID to try to scan.
-   *                      The scan returns after reading this or a higher
-   *                      ID. The file portion beyond this ID is
-   *                      potentially being updated.
-   * @return Result of the validation
-   * @throws IOException
-   */
-  static LogValidation scanEditLog(File file, long maxTxIdToScan)
-      throws IOException {
-    LogInputStream in;
-    try {
-      in = new LogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false);
-      // read the header, initialize the inputstream
-      in.init();
-    } catch (EOFException e) {
-      LOG.warn("Log file " + file + " has no valid header", e);
-      return new LogValidation(0, INVALID_LOG_INDEX, true);
-    }
-
-    try {
-      return scanEditLog(in, maxTxIdToScan);
-    } finally {
-      RaftUtils.cleanup(LOG, in);
-    }
-  }
-
-  /**
-   * Find the last valid entry index in the stream.
-   * If there are invalid or corrupt entries in the middle of the stream,
-   * scanEditLog will skip over them.
-   *
-   * This reads through the stream but does not close it.
-   *
-   * @param maxIndexToScan Maximum entry index to try to scan. The scan returns
-   *                       after reading this or a higher index. The file
-   *                       portion beyond this index is potentially being
-   *                       updated.
-   */
-  static LogValidation scanEditLog(LogInputStream in, long maxIndexToScan) {
-    long lastPos = 0;
-    long end = INVALID_LOG_INDEX;
-    long numValid = 0;
-    boolean hitError = false;
-    while (end < maxIndexToScan) {
-      long index;
-      lastPos = in.getPosition();
-      try {
-        if (hitError) {
-          LogEntryProto entry = in.nextEntry();
-          index = entry != null ? entry.getIndex() : INVALID_LOG_INDEX;
-          LOG.warn("After resync, position is " + in.getPosition());
-        } else {
-          index = in.scanNextEntry();
-        }
-        if (index == INVALID_LOG_INDEX) {
-          break;
-        } else {
-          hitError = false;
-        }
-      } catch (Throwable t) {
-        LOG.warn("Caught exception after scanning through {} ops from {}"
-            + " while determining its valid length. Position was "
-            + lastPos, numValid, in, t);
-        hitError = true;
-        continue;
-      }
-      if (end == INVALID_LOG_INDEX || index > end) {
-        end = index;
-      }
-      numValid++;
-    }
-    return new LogValidation(lastPos, end, false);
-  }
-}