You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/01/31 21:17:03 UTC

[18/54] [abbrv] incubator-ratis git commit: Renamed the packages from raft to ratis in preperation for Apache Incubation - Moved all java packages from org.apache.raft to org.apache.ratis. - Moved native package to org_apache_ratis, and native lib to l

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
new file mode 100644
index 0000000..9b2932c
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.io.Charsets;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.impl.ConfigurationManager;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.util.AutoCloseableLock;
+import org.apache.ratis.util.CodeInjectionForTesting;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * The RaftLog implementation that writes log entries into segmented files in
+ * local disk.
+ *
+ * The max log segment size is 8MB. The real log segment size may not be
+ * exactly equal to this limit. If a log entry's size exceeds 8MB, this entry
+ * will be stored in a single segment.
+ *
+ * There are two types of segments: closed segment and open segment. The former
+ * is named as "log_startindex-endindex", the later is named as
+ * "log_inprogress_startindex".
+ *
+ * There can be multiple closed segments but there is at most one open segment.
+ * When the open segment reaches the size limit, or the log term increases, we
+ * close the open segment and start a new open segment. A closed segment cannot
+ * be appended anymore, but it can be truncated in case that a follower's log is
+ * inconsistent with the current leader.
+ *
+ * Every closed segment should be non-empty, i.e., it should contain at least
+ * one entry.
+ *
+ * There should not be any gap between segments. The first segment may not start
+ * from index 0 since there may be snapshots as log compaction. The last index
+ * in segments should be no smaller than the last index of snapshot, otherwise
+ * we may have hole when append further log.
+ */
+public class SegmentedRaftLog extends RaftLog {
+  static final String HEADER_STR = "RAFTLOG1";
+  static final byte[] HEADER_BYTES = HEADER_STR.getBytes(Charsets.UTF_8);
+
+  /**
+   * I/O task definitions.
+   */
+  static abstract class Task {
+    private boolean done = false;
+
+    synchronized void done() {
+      done = true;
+      notifyAll();
+    }
+
+    synchronized void waitForDone() throws InterruptedException {
+      while (!done) {
+        wait();
+      }
+    }
+
+    abstract void execute() throws IOException;
+
+    abstract long getEndIndex();
+
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + "-" + getEndIndex();
+    }
+  }
+  private static final ThreadLocal<Task> myTask = new ThreadLocal<>();
+
+  private final RaftStorage storage;
+  private final RaftLogCache cache;
+  private final RaftLogWorker fileLogWorker;
+  private final long segmentMaxSize;
+
+  public SegmentedRaftLog(String selfId, RaftServerImpl server, RaftStorage storage,
+                          long lastIndexInSnapshot, RaftProperties properties) throws IOException {
+    super(selfId);
+    this.storage = storage;
+    this.segmentMaxSize = properties.getLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY,
+        RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT);
+    cache = new RaftLogCache();
+    fileLogWorker = new RaftLogWorker(server, storage, properties);
+    lastCommitted.set(lastIndexInSnapshot);
+  }
+
+  @Override
+  public void open(ConfigurationManager confManager, long lastIndexInSnapshot)
+      throws IOException {
+    loadLogSegments(confManager, lastIndexInSnapshot);
+    File openSegmentFile = null;
+    if (cache.getOpenSegment() != null) {
+      openSegmentFile = storage.getStorageDir()
+          .getOpenLogFile(cache.getOpenSegment().getStartIndex());
+    }
+    fileLogWorker.start(Math.max(cache.getEndIndex(), lastIndexInSnapshot),
+        openSegmentFile);
+    super.open(confManager, lastIndexInSnapshot);
+  }
+
+  @Override
+  public long getStartIndex() {
+    return cache.getStartIndex();
+  }
+
+  private void loadLogSegments(ConfigurationManager confManager,
+      long lastIndexInSnapshot) throws IOException {
+    try(AutoCloseableLock writeLock = writeLock()) {
+      List<LogPathAndIndex> paths = storage.getStorageDir().getLogSegmentFiles();
+      for (LogPathAndIndex pi : paths) {
+        LogSegment logSegment = parseLogSegment(pi, confManager);
+        cache.addSegment(logSegment);
+      }
+
+      // if the largest index is smaller than the last index in snapshot, we do
+      // not load the log to avoid holes between log segments. This may happen
+      // when the local I/O worker is too slow to persist log (slower than
+      // committing the log and taking snapshot)
+      if (!cache.isEmpty() && cache.getEndIndex() < lastIndexInSnapshot) {
+        LOG.warn("End log index {} is smaller than last index in snapshot {}",
+            cache.getEndIndex(), lastIndexInSnapshot);
+        cache.clear();
+        // TODO purge all segment files
+      }
+    }
+  }
+
+  private LogSegment parseLogSegment(LogPathAndIndex pi,
+      ConfigurationManager confManager) throws IOException {
+    final boolean isOpen = pi.endIndex == RaftServerConstants.INVALID_LOG_INDEX;
+    return LogSegment.loadSegment(pi.path.toFile(), pi.startIndex, pi.endIndex,
+        isOpen, confManager);
+  }
+
+  @Override
+  public LogEntryProto get(long index) {
+    checkLogState();
+    try(AutoCloseableLock readLock = readLock()) {
+      return cache.getEntry(index);
+    }
+  }
+
+  @Override
+  public LogEntryProto[] getEntries(long startIndex, long endIndex) {
+    checkLogState();
+    try(AutoCloseableLock readLock = readLock()) {
+      return cache.getEntries(startIndex, endIndex);
+    }
+  }
+
+  @Override
+  public LogEntryProto getLastEntry() {
+    checkLogState();
+    try(AutoCloseableLock readLock = readLock()) {
+      return cache.getLastEntry();
+    }
+  }
+
+  /**
+   * The method, along with {@link #appendEntry} and
+   * {@link #append(LogEntryProto...)} need protection of RaftServer's lock.
+   */
+  @Override
+  void truncate(long index) {
+    checkLogState();
+    try(AutoCloseableLock writeLock = writeLock()) {
+      RaftLogCache.TruncationSegments ts = cache.truncate(index);
+      if (ts != null) {
+        Task task = fileLogWorker.truncate(ts);
+        myTask.set(task);
+      }
+    }
+  }
+
+  @Override
+  void appendEntry(LogEntryProto entry) {
+    checkLogState();
+    try(AutoCloseableLock writeLock = writeLock()) {
+      final LogSegment currentOpenSegment = cache.getOpenSegment();
+      if (currentOpenSegment == null) {
+        cache.addSegment(LogSegment.newOpenSegment(entry.getIndex()));
+        fileLogWorker.startLogSegment(getNextIndex());
+      } else if (isSegmentFull(currentOpenSegment, entry)) {
+        cache.rollOpenSegment(true);
+        fileLogWorker.rollLogSegment(currentOpenSegment);
+      } else if (currentOpenSegment.numOfEntries() > 0 &&
+          currentOpenSegment.getLastRecord().entry.getTerm() != entry.getTerm()) {
+        // the term changes
+        final long currentTerm = currentOpenSegment.getLastRecord().entry
+            .getTerm();
+        Preconditions.checkState(currentTerm < entry.getTerm(),
+            "open segment's term %s is larger than the new entry's term %s",
+            currentTerm, entry.getTerm());
+        cache.rollOpenSegment(true);
+        fileLogWorker.rollLogSegment(currentOpenSegment);
+      }
+
+      cache.appendEntry(entry);
+      myTask.set(fileLogWorker.writeLogEntry(entry));
+    }
+  }
+
+  private boolean isSegmentFull(LogSegment segment, LogEntryProto entry) {
+    if (segment.getTotalSize() >= segmentMaxSize) {
+      return true;
+    } else {
+      final long entrySize = LogSegment.getEntrySize(entry);
+      // if entry size is greater than the max segment size, write it directly
+      // into the current segment
+      return entrySize <= segmentMaxSize &&
+          segment.getTotalSize() + entrySize > segmentMaxSize;
+    }
+  }
+
+  @Override
+  public void append(LogEntryProto... entries) {
+    checkLogState();
+    if (entries == null || entries.length == 0) {
+      return;
+    }
+    try(AutoCloseableLock writeLock = writeLock()) {
+      Iterator<LogEntryProto> iter = cache.iterator(entries[0].getIndex());
+      int index = 0;
+      long truncateIndex = -1;
+      for (; iter.hasNext() && index < entries.length; index++) {
+        LogEntryProto storedEntry = iter.next();
+        Preconditions.checkState(
+            storedEntry.getIndex() == entries[index].getIndex(),
+            "The stored entry's index %s is not consistent with" +
+                " the received entries[%s]'s index %s", storedEntry.getIndex(),
+            index, entries[index].getIndex());
+
+        if (storedEntry.getTerm() != entries[index].getTerm()) {
+          // we should truncate from the storedEntry's index
+          truncateIndex = storedEntry.getIndex();
+          break;
+        }
+      }
+      if (truncateIndex != -1) {
+        // truncate from truncateIndex
+        truncate(truncateIndex);
+      }
+      // append from entries[index]
+      for (int i = index; i < entries.length; i++) {
+        appendEntry(entries[i]);
+      }
+    }
+  }
+
+  @Override
+  public void logSync() throws InterruptedException {
+    CodeInjectionForTesting.execute(LOG_SYNC, getSelfId(), null);
+    final Task task = myTask.get();
+    if (task != null) {
+      task.waitForDone();
+    }
+  }
+
+  @Override
+  public long getLatestFlushedIndex() {
+    return fileLogWorker.getFlushedIndex();
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * This operation is protected by the RaftServer's lock
+   */
+  @Override
+  public void writeMetadata(long term, String votedFor) throws IOException {
+    storage.getMetaFile().set(term, votedFor);
+  }
+
+  @Override
+  public Metadata loadMetadata() throws IOException {
+    return new Metadata(storage.getMetaFile().getVotedFor(),
+        storage.getMetaFile().getTerm());
+  }
+
+  @Override
+  public void syncWithSnapshot(long lastSnapshotIndex) {
+    fileLogWorker.syncWithSnapshot(lastSnapshotIndex);
+    // TODO purge log files and normal/tmp/corrupt snapshot files
+    // if the last index in snapshot is larger than the index of the last
+    // log entry, we should delete all the log entries and their cache to avoid
+    // gaps between log segments.
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    fileLogWorker.close();
+    storage.close();
+  }
+
+  @VisibleForTesting
+  RaftLogCache getRaftLogCache() {
+    return cache;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
new file mode 100644
index 0000000..73b2af9
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+
+import org.apache.ratis.io.MD5Hash;
+import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.MD5FileUtil;
+import org.apache.ratis.util.RaftUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Manage snapshots of a raft peer.
+ * TODO: snapshot should be treated as compaction log thus can be merged into
+ *       RaftLog. In this way we can have a unified getLastTermIndex interface.
+ */
+public class SnapshotManager {
+  private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class);
+
+  private final RaftStorage storage;
+  private final String selfId;
+
+  public SnapshotManager(RaftStorage storage, String selfId)
+      throws IOException {
+    this.storage = storage;
+    this.selfId = selfId;
+  }
+
+  public void installSnapshot(StateMachine stateMachine,
+      InstallSnapshotRequestProto request) throws IOException {
+    final long lastIncludedIndex = request.getTermIndex().getIndex();
+    final RaftStorageDirectory dir = storage.getStorageDir();
+
+    File tmpDir = dir.getNewTempDir();
+    tmpDir.mkdirs();
+    tmpDir.deleteOnExit();
+
+    LOG.info("Installing snapshot:{}, to tmp dir:{}", request, tmpDir);
+
+    // TODO: Make sure that subsequent requests for the same installSnapshot are coming in order,
+    // and are not lost when whole request cycle is done. Check requestId and requestIndex here
+
+    for (FileChunkProto chunk : request.getFileChunksList()) {
+      SnapshotInfo pi = stateMachine.getLatestSnapshot();
+      if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) {
+        throw new IOException("There exists snapshot file "
+            + pi.getFiles() + " in " + selfId
+            + " with endIndex >= lastIncludedIndex " + lastIncludedIndex);
+      }
+
+      String fileName = chunk.getFilename(); // this is relative to the root dir
+      // TODO: assumes flat layout inside SM dir
+      File tmpSnapshotFile = new File(tmpDir,
+          new File(dir.getRoot(), fileName).getName());
+
+      FileOutputStream out = null;
+      try {
+        // if offset is 0, delete any existing temp snapshot file if it has the
+        // same last index.
+        if (chunk.getOffset() == 0) {
+          if (tmpSnapshotFile.exists()) {
+            FileUtils.fullyDelete(tmpSnapshotFile);
+          }
+          // create the temp snapshot file and put padding inside
+          out = new FileOutputStream(tmpSnapshotFile);
+        } else {
+          Preconditions.checkState(tmpSnapshotFile.exists());
+          out = new FileOutputStream(tmpSnapshotFile, true);
+          FileChannel fc = out.getChannel();
+          fc.position(chunk.getOffset());
+        }
+
+        // write data to the file
+        out.write(chunk.getData().toByteArray());
+      } finally {
+        RaftUtils.cleanup(null, out);
+      }
+
+      // rename the temp snapshot file if this is the last chunk. also verify
+      // the md5 digest and create the md5 meta-file.
+      if (chunk.getDone()) {
+        final MD5Hash expectedDigest =
+            new MD5Hash(chunk.getFileDigest().toByteArray());
+        // calculate the checksum of the snapshot file and compare it with the
+        // file digest in the request
+        MD5Hash digest = MD5FileUtil.computeMd5ForFile(tmpSnapshotFile);
+        if (!digest.equals(expectedDigest)) {
+          LOG.warn("The snapshot md5 digest {} does not match expected {}",
+              digest, expectedDigest);
+          // rename the temp snapshot file to .corrupt
+//          NativeIO.renameTo(tmpSnapshotFile, // TODO:
+//              dir.getCorruptSnapshotFile(lastIncludedTerm, lastIncludedIndex));
+          throw new IOException("MD5 mismatch for snapshot-" + lastIncludedIndex
+              + " installation");
+        } else {
+          MD5FileUtil.saveMD5File(tmpSnapshotFile, digest);
+        }
+      }
+    }
+
+    if (request.getDone()) {
+      LOG.info("Install snapshot is done, renaming tnp dir:{} to:{}",
+          tmpDir, dir.getStateMachineDir());
+      dir.getStateMachineDir().delete();
+      tmpDir.renameTo(dir.getStateMachineDir());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java
new file mode 100644
index 0000000..397a12b
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.statemachine;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
+import org.apache.ratis.util.LifeCycle;
+
+/**
+ * Base implementation for StateMachines.
+ */
+public class BaseStateMachine implements StateMachine {
+
+  protected RaftProperties properties;
+  protected RaftStorage storage;
+  protected RaftConfiguration raftConf;
+  protected final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName());
+
+  @Override
+  public LifeCycle.State getLifeCycleState() {
+    return lifeCycle.getCurrentState();
+  }
+
+  @Override
+  public void initialize(String id, RaftProperties properties, RaftStorage storage)
+      throws IOException {
+    lifeCycle.setName(getClass().getSimpleName() + ":" + id);
+    this.properties = properties;
+    this.storage = storage;
+  }
+
+  @Override
+  public void setRaftConfiguration(RaftConfiguration conf) {
+    this.raftConf = conf;
+  }
+
+  @Override
+  public RaftConfiguration getRaftConfiguration() {
+    return this.raftConf;
+  }
+
+  @Override
+  public SnapshotInfo getLatestSnapshot() {
+    return getStateMachineStorage().getLatestSnapshot();
+  }
+
+  @Override
+  public void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException {
+    // do nothing
+  }
+
+  @Override
+  public void pause() {
+  }
+
+  @Override
+  public void reinitialize(String id, RaftProperties properties, RaftStorage storage)
+      throws IOException {
+  }
+
+  @Override
+  public TransactionContext applyTransactionSerial(TransactionContext trx) throws IOException {
+    return trx;
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) throws IOException {
+    // return the same message contained in the entry
+    Message msg = () -> trx.getLogEntry().get().getSmLogEntry().getData();
+    return CompletableFuture.completedFuture(msg);
+  }
+
+  @Override
+  public long takeSnapshot() throws IOException {
+    return RaftServerConstants.INVALID_LOG_INDEX;
+  }
+
+  @Override
+  public StateMachineStorage getStateMachineStorage() {
+    return new StateMachineStorage() {
+      @Override
+      public void init(RaftStorage raftStorage) throws IOException {
+      }
+
+      @Override
+      public SnapshotInfo getLatestSnapshot() {
+        return null;
+      }
+
+      @Override
+      public void format() throws IOException {
+      }
+    };
+  }
+
+  @Override
+  public CompletableFuture<RaftClientReply> query(
+      RaftClientRequest request) {
+    return null;
+  }
+
+  @Override
+  public TransactionContext startTransaction(RaftClientRequest request)
+      throws IOException {
+    return new TransactionContext(this, request,
+        SMLogEntryProto.newBuilder()
+            .setData(request.getMessage().getContent())
+            .build());
+  }
+
+  @Override
+  public TransactionContext cancelTransaction(TransactionContext trx) throws IOException {
+    return trx;
+  }
+
+  @Override
+  public TransactionContext preAppendTransaction(TransactionContext trx) throws IOException {
+    return trx;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // do nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/FileListSnapshotInfo.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/FileListSnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/FileListSnapshotInfo.java
new file mode 100644
index 0000000..1858603
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/FileListSnapshotInfo.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.FileInfo;
+
+/**
+ * Each snapshot has a list of files.
+ *
+ * The objects of this class are immutable.
+ */
+public class FileListSnapshotInfo implements SnapshotInfo {
+  private final TermIndex termIndex;
+  private final List<FileInfo> files;
+
+  public FileListSnapshotInfo(List<FileInfo> files, long term, long index) {
+    this.termIndex = TermIndex.newTermIndex(term, index);
+    this.files = Collections.unmodifiableList(files);
+  }
+
+  @Override
+  public TermIndex getTermIndex() {
+    return termIndex;
+  }
+
+  @Override
+  public long getTerm() {
+    return termIndex.getTerm();
+  }
+
+  @Override
+  public long getIndex() {
+    return termIndex.getIndex();
+  }
+
+  @Override
+  public List<FileInfo> getFiles() {
+    return files;
+  }
+
+  @Override
+  public String toString() {
+    return termIndex + ":" + files;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java
new file mode 100644
index 0000000..d417db7
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.ratis.io.MD5Hash;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.util.AtomicFileOutputStream;
+import org.apache.ratis.util.MD5FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A StateMachineStorage that stores the snapshot in a single file.
+ */
+public class SimpleStateMachineStorage implements StateMachineStorage {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SimpleStateMachineStorage.class);
+
+  static final String SNAPSHOT_FILE_PREFIX = "snapshot";
+  static final String CORRUPT_SNAPSHOT_FILE_SUFFIX = ".corrupt";
+  /** snapshot.term_index */
+  static final Pattern SNAPSHOT_REGEX =
+      Pattern.compile(SNAPSHOT_FILE_PREFIX + "\\.(\\d+)_(\\d+)");
+
+  private RaftStorage raftStorage;
+  private File smDir = null;
+
+  private volatile SingleFileSnapshotInfo currentSnapshot = null;
+
+  public void init(RaftStorage raftStorage) throws IOException {
+    this.raftStorage = raftStorage;
+    this.smDir = raftStorage.getStorageDir().getStateMachineDir();
+    loadLatestSnapshot();
+  }
+
+  @Override
+  public void format() throws IOException {
+    // TODO
+  }
+
+  @VisibleForTesting
+  public static TermIndex getTermIndexFromSnapshotFile(File file) {
+    final String name = file.getName();
+    final Matcher m = SNAPSHOT_REGEX.matcher(name);
+    if (!m.matches()) {
+      throw new IllegalArgumentException("File \"" + file
+          + "\" does not match snapshot file name pattern \""
+          + SNAPSHOT_REGEX + "\"");
+    }
+    final long term = Long.parseLong(m.group(1));
+    final long index = Long.parseLong(m.group(2));
+    return TermIndex.newTermIndex(term, index);
+  }
+
+  protected static String getTmpSnapshotFileName(long term, long endIndex) {
+    return getSnapshotFileName(term, endIndex) + AtomicFileOutputStream.TMP_EXTENSION;
+  }
+
+  protected static String getCorruptSnapshotFileName(long term, long endIndex) {
+    return getSnapshotFileName(term, endIndex) + CORRUPT_SNAPSHOT_FILE_SUFFIX;
+  }
+
+  public File getSnapshotFile(long term, long endIndex) {
+    return new File(smDir, getSnapshotFileName(term, endIndex));
+  }
+
+  protected File getTmpSnapshotFile(long term, long endIndex) {
+    return new File(smDir, getTmpSnapshotFileName(term, endIndex));
+  }
+
+  protected File getCorruptSnapshotFile(long term, long endIndex) {
+    return new File(smDir, getCorruptSnapshotFileName(term, endIndex));
+  }
+
+  public SingleFileSnapshotInfo findLatestSnapshot() throws IOException {
+    SingleFileSnapshotInfo latest = null;
+    try (DirectoryStream<Path> stream =
+             Files.newDirectoryStream(smDir.toPath())) {
+      for (Path path : stream) {
+        Matcher matcher = SNAPSHOT_REGEX.matcher(path.getFileName().toString());
+        if (matcher.matches()) {
+          final long endIndex = Long.parseLong(matcher.group(2));
+          if (latest == null || endIndex > latest.getIndex()) {
+            final long term = Long.parseLong(matcher.group(1));
+            MD5Hash fileDigest = MD5FileUtil.readStoredMd5ForFile(path.toFile());
+            final FileInfo fileInfo = new FileInfo(path, fileDigest);
+            latest = new SingleFileSnapshotInfo(fileInfo, term, endIndex);
+          }
+        }
+      }
+    }
+    return latest;
+  }
+
+  public void loadLatestSnapshot() throws IOException {
+    this.currentSnapshot = findLatestSnapshot();
+  }
+
+  public static String getSnapshotFileName(long term, long endIndex) {
+    return SNAPSHOT_FILE_PREFIX + "." + term + "_" + endIndex;
+  }
+
+  @Override
+  public SingleFileSnapshotInfo getLatestSnapshot() {
+    return currentSnapshot;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/SingleFileSnapshotInfo.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/SingleFileSnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/SingleFileSnapshotInfo.java
new file mode 100644
index 0000000..5bca2c9
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/SingleFileSnapshotInfo.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import java.util.Arrays;
+
+import org.apache.ratis.server.storage.FileInfo;
+
+/**
+ * Each snapshot only has a single file.
+ *
+ * The objects of this class are immutable.
+ */
+public class SingleFileSnapshotInfo extends FileListSnapshotInfo {
+  public SingleFileSnapshotInfo(FileInfo fileInfo, long term, long endIndex) {
+    super(Arrays.asList(fileInfo), term, endIndex);
+  }
+
+  /** @return the file associated with the snapshot. */
+  public FileInfo getFile() {
+    return getFiles().get(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java
new file mode 100644
index 0000000..f0aadd9
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import java.util.List;
+
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.FileInfo;
+
+/**
+ * SnapshotInfo represents a durable state by the state machine. The state machine implementation is
+ * responsible for the layout of the snapshot files as well as making the data durable. Latest term,
+ * latest index, and the raft configuration must be saved together with any data files in the
+ * snapshot.
+ */
+public interface SnapshotInfo {
+
+  /**
+   * Returns the term and index corresponding to this snapshot.
+   * @return The term and index corresponding to this snapshot.
+   */
+  TermIndex getTermIndex();
+
+  /**
+   * Returns the term corresponding to this snapshot.
+   * @return The term corresponding to this snapshot.
+   */
+  long getTerm();
+
+  /**
+   * Returns the index corresponding to this snapshot.
+   * @return The index corresponding to this snapshot.
+   */
+  long getIndex();
+
+  /**
+   * Returns a list of files corresponding to this snapshot. This list should include all
+   * the files that the state machine keeps in its data directory. This list of files will be
+   * copied as to other replicas in install snapshot RPCs.
+   * @return a list of Files corresponding to the this snapshot.
+   */
+  List<FileInfo> getFiles();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
new file mode 100644
index 0000000..e377aa7
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.util.LifeCycle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * StateMachine is the entry point for the custom implementation of replicated state as defined in
+ * the "State Machine Approach" in the literature
+ * (see https://en.wikipedia.org/wiki/State_machine_replication).
+ */
+public interface StateMachine extends Closeable {
+  /**
+   * Initializes the State Machine with the given properties and storage. The state machine is
+   * responsible reading the latest snapshot from the file system (if any) and initialize itself
+   * with the latest term and index there including all the edits.
+   */
+  void initialize(String id, RaftProperties properties, RaftStorage storage)
+      throws IOException;
+
+  /**
+   * Returns the lifecycle state for this StateMachine.
+   * @return the lifecycle state.
+   */
+  LifeCycle.State getLifeCycleState();
+
+  /**
+   * Pauses the state machine. On return, the state machine should have closed all open files so
+   * that a new snapshot can be installed.
+   */
+  void pause();
+
+  /**
+   * Re-initializes the State Machine in PAUSED state with the given properties and storage. The
+   * state machine is responsible reading the latest snapshot from the file system (if any) and
+   * initialize itself with the latest term and index there including all the edits.
+   */
+  void reinitialize(String id, RaftProperties properties, RaftStorage storage)
+      throws IOException;
+
+  /**
+   * Dump the in-memory state into a snapshot file in the RaftStorage. The
+   * StateMachine implementation can decide 1) its own snapshot format, 2) when
+   * a snapshot is taken, and 3) how the snapshot is taken (e.g., whether the
+   * snapshot blocks the state machine, and whether to purge log entries after
+   * a snapshot is done).
+   *
+   * In the meanwhile, when the size of raft log outside of the latest snapshot
+   * exceeds certain threshold, the RaftServer may choose to trigger a snapshot
+   * if {@link RaftServerConfigKeys#RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY} is
+   * enabled.
+   *
+   * The snapshot should include the latest raft configuration.
+   *
+   * @return the largest index of the log entry that has been applied to the
+   *         state machine and also included in the snapshot. Note the log purge
+   *         should be handled separately.
+   */
+  // TODO: refactor this
+  long takeSnapshot() throws IOException;
+
+  /**
+   * Record the RaftConfiguration in the state machine. The RaftConfiguration
+   * should also be stored in the snapshot.
+   */
+  void setRaftConfiguration(RaftConfiguration conf);
+
+  /**
+   * @return the latest raft configuration recorded in the state machine.
+   */
+  RaftConfiguration getRaftConfiguration();
+
+  /**
+   * @return StateMachineStorage to interact with the durability guarantees provided by the
+   * state machine.
+   */
+  StateMachineStorage getStateMachineStorage();
+
+  /**
+   * Returns the information for the latest durable snapshot.
+   */
+  SnapshotInfo getLatestSnapshot();
+
+  /**
+   * Query the state machine. The request must be read-only.
+   * TODO: extend RaftClientRequest to have a read-only request subclass.
+   */
+  CompletableFuture<RaftClientReply> query(RaftClientRequest request);
+
+  /**
+   * Validate/pre-process the incoming update request in the state machine.
+   * @return the content to be written to the log entry. Null means the request
+   * should be rejected.
+   * @throws IOException thrown by the state machine while validation
+   */
+  TransactionContext startTransaction(RaftClientRequest request)
+      throws IOException;
+
+  /**
+   * This is called before the transaction passed from the StateMachine is appended to the raft log.
+   * This method will be called from log append and having the same strict serial order that the
+   * transactions will have in the RAFT log. Since this is called serially in the critical path of
+   * log append, it is important to do only required operations here.
+   * @return The Transaction context.
+   */
+  TransactionContext preAppendTransaction(TransactionContext trx) throws IOException;
+
+  /**
+   * Called to notify the state machine that the Transaction passed cannot be appended (or synced).
+   * The exception field will indicate whether there was an exception or not.
+   * @param trx the transaction to cancel
+   * @return cancelled transaction
+   */
+  TransactionContext cancelTransaction(TransactionContext trx) throws IOException;
+
+  /**
+   * Called for transactions that have been committed to the RAFT log. This step is called
+   * sequentially in strict serial order that the transactions have been committed in the log.
+   * The SM is expected to do only necessary work, and leave the actual apply operation to the
+   * applyTransaction calls that can happen concurrently.
+   * @param trx the transaction state including the log entry that has been committed to a quorum
+   *            of the raft peers
+   * @return The Transaction context.
+   */
+  TransactionContext applyTransactionSerial(TransactionContext trx) throws IOException;
+
+  /**
+   * Apply a committed log entry to the state machine. This method can be called concurrently with
+   * the other calls, and there is no guarantee that the calls will be ordered according to the
+   * log commit order.
+   * @param trx the transaction state including the log entry that has been committed to a quorum
+   *            of the raft peers
+   */
+  // TODO: We do not need to return CompletableFuture
+  CompletableFuture<Message> applyTransaction(TransactionContext trx) throws IOException;
+
+  /**
+   * Notify the state machine that the raft peer is no longer leader.
+   */
+  void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
new file mode 100644
index 0000000..4f7951a
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import java.io.IOException;
+
+import org.apache.ratis.server.storage.RaftStorage;
+
+public interface StateMachineStorage {
+
+  void init(RaftStorage raftStorage) throws IOException;
+
+  /**
+   * Returns the information for the latest durable snapshot.
+   */
+  SnapshotInfo getLatestSnapshot();
+
+  // TODO: StateMachine can decide to compact the files independently of concurrent install snapshot
+  // etc requests. We should have ref counting for the SnapshotInfo with a release mechanism
+  // so that raft server will release the files after the snapshot file copy in case a compaction
+  // is waiting for deleting these files.
+
+  void format() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
new file mode 100644
index 0000000..81bea45
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Optional;
+
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
+
+/**
+ * Context for a transaction.
+ * The transaction might have originated from a client request, or it
+ * maybe coming from another replica of the state machine through the RAFT log.
+ * {@link TransactionContext} can be created from
+ * either the {@link StateMachine} or the state machine updater.
+ *
+ * In the first case, the {@link StateMachine} is a leader. When it receives
+ * a {@link StateMachine#startTransaction(RaftClientRequest)} request, it returns
+ * a {@link TransactionContext} with the changes from the {@link StateMachine}.
+ * The same context will be passed back to the {@link StateMachine}
+ * via the {@link StateMachine#applyTransaction(TransactionContext)} call
+ * or the {@link StateMachine#notifyNotLeader(Collection)} call.
+ *
+ * In the second case, the {@link StateMachine} is a follower.
+ * The {@link TransactionContext} will be a committed entry coming from
+ * the RAFT log from the leader.
+ */
+public class TransactionContext {
+
+  /** The {@link StateMachine} that originated the transaction. */
+  private final StateMachine stateMachine;
+
+  /** Original request from the client */
+  private Optional<RaftClientRequest> clientRequest = Optional.empty();
+
+  /** Exception from the {@link StateMachine} or from the log */
+  private Optional<Exception> exception = Optional.empty();
+
+  /** Data from the {@link StateMachine} */
+  private Optional<SMLogEntryProto> smLogEntryProto = Optional.empty();
+
+  /**
+   * Context specific to the state machine.
+   * The {@link StateMachine} can use this object to carry state between
+   * {@link StateMachine#startTransaction(RaftClientRequest)} and
+   * {@link StateMachine#applyTransaction(TransactionContext)}.
+   */
+  private Optional<Object> stateMachineContext = Optional.empty();
+
+  /**
+   * Whether to commit the transaction to the RAFT Log.
+   * In some cases the {@link StateMachine} may want to indicate
+   * that the transaction should not be committed
+   */
+  private boolean shouldCommit = true;
+
+  /** Committed LogEntry. */
+  private Optional<LogEntryProto> logEntry = Optional.empty();
+
+  private TransactionContext(StateMachine stateMachine) {
+    this.stateMachine = stateMachine;
+  }
+
+  /** The same as this(stateMachine, clientRequest, smLogEntryProto, null). */
+  public TransactionContext(
+      StateMachine stateMachine, RaftClientRequest clientRequest,
+      SMLogEntryProto smLogEntryProto) {
+    this(stateMachine, clientRequest, smLogEntryProto, null);
+  }
+
+  /**
+   * Construct a {@link TransactionContext} from a client request.
+   * Used by the state machine to start a transaction
+   * and send the Log entry representing the transaction data
+   * to be applied to the raft log.
+   */
+  public TransactionContext(
+      StateMachine stateMachine, RaftClientRequest clientRequest,
+      SMLogEntryProto smLogEntryProto, Object stateMachineContext) {
+    this(stateMachine);
+    this.clientRequest = Optional.of(clientRequest);
+    this.smLogEntryProto = Optional.ofNullable(smLogEntryProto);
+    this.stateMachineContext = Optional.ofNullable(stateMachineContext);
+  }
+
+  /** The same as this(stateMachine, clientRequest, exception, null). */
+  public TransactionContext(
+      StateMachine stateMachine, RaftClientRequest clientRequest,
+      Exception exception) {
+    this(stateMachine, clientRequest, exception, null);
+  }
+
+  /**
+   * Construct a {@link TransactionContext} from a client request to signal
+   * an exception so that the RAFT server will fail the request on behalf
+   * of the {@link StateMachine}.
+   */
+  public TransactionContext(
+      StateMachine stateMachine, RaftClientRequest clientRequest,
+      Exception exception, Object stateMachineContext) {
+    this(stateMachine);
+    this.clientRequest = Optional.of(clientRequest);
+    this.exception = Optional.of(exception);
+    this.stateMachineContext = Optional.ofNullable(stateMachineContext);
+  }
+
+  /**
+   * Construct a {@link TransactionContext} from a {@link LogEntryProto}.
+   * Used by followers for applying committed entries to the state machine.
+   * @param logEntry the log entry to be applied
+   */
+  public TransactionContext(StateMachine stateMachine, LogEntryProto logEntry) {
+    this(stateMachine);
+    this.smLogEntryProto = Optional.of(logEntry.getSmLogEntry());
+    this.logEntry = Optional.of(logEntry);
+  }
+
+  public Optional<RaftClientRequest> getClientRequest() {
+    return this.clientRequest;
+  }
+
+  public Optional<SMLogEntryProto> getSMLogEntry() {
+    return this.smLogEntryProto;
+  }
+
+  public Optional<Exception> getException() {
+    return this.exception;
+  }
+
+  public TransactionContext setStateMachineContext(Object stateMachineContext) {
+    this.stateMachineContext = Optional.ofNullable(stateMachineContext);
+    return this;
+  }
+
+  public Optional<Object> getStateMachineContext() {
+    return stateMachineContext;
+  }
+
+  public TransactionContext setLogEntry(LogEntryProto logEntry) {
+    this.logEntry = Optional.of(logEntry);
+    return this;
+  }
+
+  public TransactionContext setSmLogEntryProto(SMLogEntryProto smLogEntryProto) {
+    this.smLogEntryProto = Optional.of(smLogEntryProto);
+    return this;
+  }
+
+  public Optional<LogEntryProto> getLogEntry() {
+    return logEntry;
+  }
+
+  private TransactionContext setException(IOException ioe) {
+    assert !this.exception.isPresent();
+    this.exception = Optional.of(ioe);
+    return this;
+  }
+
+  public TransactionContext setShouldCommit(boolean shouldCommit) {
+    this.shouldCommit = shouldCommit;
+    return this;
+  }
+
+  public boolean shouldCommit() {
+    // TODO: Hook this up in the server to bypass the RAFT Log and send back a response to client
+    return this.shouldCommit;
+  }
+
+  // proxy StateMachine methods. We do not want to expose the SM to the RaftLog
+
+  /**
+   * This is called before the transaction passed from the StateMachine is appended to the raft log.
+   * This method will be called from log append and having the same strict serial order that the
+   * Transactions will have in the RAFT log. Since this is called serially in the critical path of
+   * log append, it is important to do only required operations here.
+   * @return The Transaction context.
+   */
+  public TransactionContext preAppendTransaction() throws IOException {
+    return stateMachine.preAppendTransaction(this);
+  }
+
+  /**
+   * Called to notify the state machine that the Transaction passed cannot be appended (or synced).
+   * The exception field will indicate whether there was an exception or not.
+   * @return cancelled transaction
+   */
+  public TransactionContext cancelTransaction() throws IOException {
+    // TODO: This is not called from Raft server / log yet. When an IOException happens, we should
+    // call this to let the SM know that Transaction cannot be synced
+    return stateMachine.cancelTransaction(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
new file mode 100644
index 0000000..60cbb9c
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -0,0 +1,445 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.client.impl.RaftClientImpl;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
+import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.storage.MemoryRaftLog;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.statemachine.BaseStateMachine;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.ExitUtils;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public abstract class MiniRaftCluster {
+  public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class);
+  public static final DelayLocalExecutionInjection logSyncDelay =
+      new DelayLocalExecutionInjection(RaftLog.LOG_SYNC);
+
+  public static final String CLASS_NAME = MiniRaftCluster.class.getSimpleName();
+  public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class";
+  public static final Class<? extends StateMachine> STATEMACHINE_CLASS_DEFAULT = BaseStateMachine.class;
+
+  public static abstract class Factory<CLUSTER extends MiniRaftCluster> {
+    public abstract CLUSTER newCluster(
+        String[] ids, RaftProperties prop, boolean formatted)
+        throws IOException;
+
+    public CLUSTER newCluster(
+        int numServer, RaftProperties prop, boolean formatted)
+        throws IOException {
+      return newCluster(generateIds(numServer, 0), prop, formatted);
+    }
+  }
+
+  public static abstract class RpcBase extends MiniRaftCluster {
+    public RpcBase(String[] ids, RaftProperties properties, boolean formatted) {
+      super(ids, properties, formatted);
+    }
+
+    protected abstract RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException;
+
+    @Override
+    protected void setPeerRpc() throws IOException {
+      for (RaftPeer p : conf.getPeers()) {
+        setPeerRpc(p);
+      }
+    }
+
+    @Override
+    public void restartServer(String id, boolean format) throws IOException {
+      super.restartServer(id, format);
+      setPeerRpc(conf.getPeer(id)).start();
+    }
+
+    @Override
+    public void setBlockRequestsFrom(String src, boolean block) {
+      RaftTestUtil.setBlockRequestsFrom(src, block);
+    }
+  }
+
+  public static class PeerChanges {
+    public final RaftPeer[] allPeersInNewConf;
+    public final RaftPeer[] newPeers;
+    public final RaftPeer[] removedPeers;
+
+    public PeerChanges(RaftPeer[] all, RaftPeer[] newPeers, RaftPeer[] removed) {
+      this.allPeersInNewConf = all;
+      this.newPeers = newPeers;
+      this.removedPeers = removed;
+    }
+  }
+
+  public static RaftConfiguration initConfiguration(String[] ids) {
+    return RaftConfiguration.newBuilder()
+        .setConf(Arrays.stream(ids).map(RaftPeer::new).collect(Collectors.toList()))
+        .build();
+  }
+
+  private static String getBaseDirectory() {
+    return System.getProperty("test.build.data", "target/test/data") + "/raft/";
+  }
+
+  private static void formatDir(String dirStr) {
+    final File serverDir = new File(dirStr);
+    Preconditions.checkState(FileUtils.fullyDelete(serverDir),
+        "Failed to format directory %s", dirStr);
+    LOG.info("Formatted directory {}", dirStr);
+  }
+
+  public static String[] generateIds(int numServers, int base) {
+    String[] ids = new String[numServers];
+    for (int i = 0; i < numServers; i++) {
+      ids[i] = "s" + (i + base);
+    }
+    return ids;
+  }
+
+  protected RaftConfiguration conf;
+  protected final RaftProperties properties;
+  private final String testBaseDir;
+  protected final Map<String, RaftServerImpl> servers =
+      Collections.synchronizedMap(new LinkedHashMap<>());
+
+  public MiniRaftCluster(String[] ids, RaftProperties properties,
+      boolean formatted) {
+    this.conf = initConfiguration(ids);
+    this.properties = new RaftProperties(properties);
+    this.testBaseDir = getBaseDirectory();
+
+    conf.getPeers().forEach(
+        p -> servers.put(p.getId(), newRaftServer(p.getId(), conf, formatted)));
+
+    ExitUtils.disableSystemExit();
+  }
+
+  protected <RPC extends  RaftServerRpc> void init(Map<RaftPeer, RPC> peers) {
+    LOG.info("peers = " + peers.keySet());
+    conf = RaftConfiguration.newBuilder().setConf(peers.keySet()).build();
+    for (Map.Entry<RaftPeer, RPC> entry : peers.entrySet()) {
+      final RaftServerImpl server = servers.get(entry.getKey().getId());
+      server.setInitialConf(conf);
+      server.setServerRpc(entry.getValue());
+    }
+  }
+
+  public void start() {
+    LOG.info("Starting " + getClass().getSimpleName());
+    servers.values().forEach(RaftServerImpl::start);
+  }
+
+  /**
+   * start a stopped server again.
+   */
+  public void restartServer(String id, boolean format) throws IOException {
+    killServer(id);
+    servers.remove(id);
+    servers.put(id, newRaftServer(id, conf, format));
+  }
+
+  public final void restart(boolean format) throws IOException {
+    servers.values().stream().filter(RaftServerImpl::isAlive)
+        .forEach(RaftServerImpl::close);
+    List<String> idList = new ArrayList<>(servers.keySet());
+    for (String id : idList) {
+      servers.remove(id);
+      servers.put(id, newRaftServer(id, conf, format));
+    }
+
+    setPeerRpc();
+    start();
+  }
+
+  protected abstract void setPeerRpc() throws IOException;
+
+  public int getMaxTimeout() {
+    return properties.getInt(
+        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY,
+        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT);
+  }
+
+  public RaftConfiguration getConf() {
+    return conf;
+  }
+
+  private RaftServerImpl newRaftServer(String id, RaftConfiguration conf,
+                                       boolean format) {
+    final RaftServerImpl s;
+    try {
+      final String dirStr = testBaseDir + id;
+      if (format) {
+        formatDir(dirStr);
+      }
+      properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, dirStr);
+      s = new RaftServerImpl(id, conf, properties, getStateMachine4Test(properties));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return s;
+  }
+
+  static StateMachine getStateMachine4Test(RaftProperties properties) {
+    final Class<? extends StateMachine> smClass = properties.getClass(
+        STATEMACHINE_CLASS_KEY,
+        STATEMACHINE_CLASS_DEFAULT,
+        StateMachine.class);
+    return RaftUtils.newInstance(smClass);
+  }
+
+  public abstract RaftClientRequestSender getRaftClientRequestSender();
+
+  protected <RPC extends RaftServerRpc> Collection<RaftPeer> addNewPeers(
+      Map<RaftPeer, RPC> newPeers, Collection<RaftServerImpl> newServers,
+      boolean startService) throws IOException {
+    for (Map.Entry<RaftPeer, RPC> entry : newPeers.entrySet()) {
+      RaftServerImpl server = servers.get(entry.getKey().getId());
+      server.setServerRpc(entry.getValue());
+    }
+    if (startService) {
+      newServers.forEach(RaftServerImpl::start);
+    }
+    return new ArrayList<>(newPeers.keySet());
+  }
+
+  protected abstract Collection<RaftPeer> addNewPeers(
+      Collection<RaftPeer> newPeers, Collection<RaftServerImpl> newServers,
+      boolean startService) throws IOException;
+
+  public PeerChanges addNewPeers(int number, boolean startNewPeer)
+      throws IOException {
+    return addNewPeers(generateIds(number, servers.size()), startNewPeer);
+  }
+
+  public PeerChanges addNewPeers(String[] ids,
+      boolean startNewPeer) throws IOException {
+    LOG.info("Add new peers {}", Arrays.asList(ids));
+    Collection<RaftPeer> newPeers = new ArrayList<>(ids.length);
+    for (String id : ids) {
+      newPeers.add(new RaftPeer(id));
+    }
+
+    // create and add new RaftServers
+    final List<RaftServerImpl> newServers = new ArrayList<>(ids.length);
+    for (RaftPeer p : newPeers) {
+      RaftServerImpl newServer = newRaftServer(p.getId(), conf, true);
+      Preconditions.checkArgument(!servers.containsKey(p.getId()));
+      servers.put(p.getId(), newServer);
+      newServers.add(newServer);
+    }
+
+    // for hadoop-rpc-enabled peer, we assign inetsocketaddress here
+    newPeers = addNewPeers(newPeers, newServers, startNewPeer);
+
+    final RaftPeer[] np = newPeers.toArray(new RaftPeer[newPeers.size()]);
+    newPeers.addAll(conf.getPeers());
+    conf = RaftConfiguration.newBuilder().setConf(newPeers).setLogEntryIndex(0).build();
+    RaftPeer[] p = newPeers.toArray(new RaftPeer[newPeers.size()]);
+    return new PeerChanges(p, np, new RaftPeer[0]);
+  }
+
+  public void startServer(String id) {
+    RaftServerImpl server = servers.get(id);
+    assert server != null;
+    server.start();
+  }
+
+  private RaftPeer getPeer(RaftServerImpl s) {
+    return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress());
+  }
+
+  /**
+   * prepare the peer list when removing some peers from the conf
+   */
+  public PeerChanges removePeers(int number, boolean removeLeader,
+      Collection<RaftPeer> excluded) {
+    Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers());
+    List<RaftPeer> removedPeers = new ArrayList<>(number);
+    if (removeLeader) {
+      final RaftPeer leader = getPeer(getLeader());
+      assert !excluded.contains(leader);
+      peers.remove(leader);
+      removedPeers.add(leader);
+    }
+    List<RaftServerImpl> followers = getFollowers();
+    for (int i = 0, removed = 0; i < followers.size() &&
+        removed < (removeLeader ? number - 1 : number); i++) {
+      RaftPeer toRemove = getPeer(followers.get(i));
+      if (!excluded.contains(toRemove)) {
+        peers.remove(toRemove);
+        removedPeers.add(toRemove);
+        removed++;
+      }
+    }
+    conf = RaftConfiguration.newBuilder().setConf(peers).setLogEntryIndex(0).build();
+    RaftPeer[] p = peers.toArray(new RaftPeer[peers.size()]);
+    return new PeerChanges(p, new RaftPeer[0],
+        removedPeers.toArray(new RaftPeer[removedPeers.size()]));
+  }
+
+  public void killServer(String id) {
+    servers.get(id).close();
+  }
+
+  public String printServers() {
+    StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n");
+    for (RaftServerImpl s : servers.values()) {
+      b.append("  ");
+      b.append(s).append("\n");
+    }
+    return b.toString();
+  }
+
+  public String printAllLogs() {
+    StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n");
+    for (RaftServerImpl s : servers.values()) {
+      b.append("  ");
+      b.append(s).append("\n");
+
+      final RaftLog log = s.getState().getLog();
+      if (log instanceof MemoryRaftLog) {
+        b.append("    ");
+        b.append(((MemoryRaftLog) log).getEntryString());
+      }
+    }
+    return b.toString();
+  }
+
+  public RaftServerImpl getLeader() {
+    final List<RaftServerImpl> leaders = new ArrayList<>();
+    servers.values().stream()
+        .filter(s -> s.isAlive() && s.isLeader())
+        .forEach(s -> {
+      if (leaders.isEmpty()) {
+        leaders.add(s);
+      } else {
+        final long leaderTerm = leaders.get(0).getState().getCurrentTerm();
+        final long term = s.getState().getCurrentTerm();
+        if (term >= leaderTerm) {
+          if (term > leaderTerm) {
+            leaders.clear();
+          }
+          leaders.add(s);
+        }
+      }
+    });
+    if (leaders.isEmpty()) {
+      return null;
+    } else if (leaders.size() != 1) {
+      Assert.fail(printServers() + leaders.toString()
+          + "leaders.size() = " + leaders.size() + " != 1");
+    }
+    return leaders.get(0);
+  }
+
+  public boolean isLeader(String leaderId) throws InterruptedException {
+    final RaftServerImpl leader = getLeader();
+    return leader != null && leader.getId().equals(leaderId);
+  }
+
+  public List<RaftServerImpl> getFollowers() {
+    return servers.values().stream()
+        .filter(s -> s.isAlive() && s.isFollower())
+        .collect(Collectors.toList());
+  }
+
+  public Collection<RaftServerImpl> getServers() {
+    return servers.values();
+  }
+
+  public RaftServerImpl getServer(String id) {
+    return servers.get(id);
+  }
+
+  public Collection<RaftPeer> getPeers() {
+    return getServers().stream().map(s ->
+        new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress()))
+        .collect(Collectors.toList());
+  }
+
+  public RaftClient createClient(String clientId, String leaderId) {
+    return new RaftClientImpl(clientId, conf.getPeers(),
+        getRaftClientRequestSender(), leaderId, properties);
+  }
+
+  public void shutdown() {
+    LOG.info("Stopping " + getClass().getSimpleName());
+    servers.values().stream().filter(RaftServerImpl::isAlive)
+        .forEach(RaftServerImpl::close);
+
+    if (ExitUtils.isTerminated()) {
+      LOG.error("Test resulted in an unexpected exit",
+          ExitUtils.getFirstExitException());
+      throw new AssertionError("Test resulted in an unexpected exit");
+    }
+  }
+
+  /**
+   * Block all the incoming requests for the peer with leaderId. Also delay
+   * outgoing or incoming msg for all other peers.
+   */
+  protected abstract void blockQueueAndSetDelay(String leaderId, int delayMs)
+      throws InterruptedException;
+
+  /**
+   * Try to enforce the leader of the cluster.
+   * @param leaderId ID of the targeted leader server.
+   * @return true if server has been successfully enforced to the leader, false
+   *         otherwise.
+   */
+  public boolean tryEnforceLeader(String leaderId) throws InterruptedException {
+    // do nothing and see if the given id is already a leader.
+    if (isLeader(leaderId)) {
+      return true;
+    }
+
+    // Blocking all other server's RPC read process to make sure a read takes at
+    // least ELECTION_TIMEOUT_MIN. In this way when the target leader request a
+    // vote, all non-leader servers can grant the vote.
+    // Disable the target leader server RPC so that it can request a vote.
+    blockQueueAndSetDelay(leaderId, RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT);
+
+    // Reopen queues so that the vote can make progress.
+    blockQueueAndSetDelay(leaderId, 0);
+
+    return isLeader(leaderId);
+  }
+
+  /** Block/unblock the requests sent from the given source. */
+  public abstract void setBlockRequestsFrom(String src, boolean block);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
new file mode 100644
index 0000000..4ec78b9
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.junit.*;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.ratis.RaftTestUtil.waitAndKillLeader;
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class RaftBasicTests {
+  public static final Logger LOG = LoggerFactory.getLogger(RaftBasicTests.class);
+
+  public static final int NUM_SERVERS = 5;
+
+  protected static final RaftProperties properties = new RaftProperties();
+
+  public abstract MiniRaftCluster getCluster();
+
+  public RaftProperties getProperties() {
+    return properties;
+  }
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(120 * 1000);
+
+  @Before
+  public void setup() throws IOException {
+    Assert.assertNull(getCluster().getLeader());
+    getCluster().start();
+  }
+
+  @After
+  public void tearDown() {
+    final MiniRaftCluster cluster = getCluster();
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testBasicLeaderElection() throws Exception {
+    LOG.info("Running testBasicLeaderElection");
+    final MiniRaftCluster cluster = getCluster();
+    waitAndKillLeader(cluster, true);
+    waitAndKillLeader(cluster, true);
+    waitAndKillLeader(cluster, true);
+    waitAndKillLeader(cluster, false);
+  }
+
+  @Test
+  public void testBasicAppendEntries() throws Exception {
+    LOG.info("Running testBasicAppendEntries");
+    final MiniRaftCluster cluster = getCluster();
+    RaftServerImpl leader = waitForLeader(cluster);
+    final long term = leader.getState().getCurrentTerm();
+    final String killed = cluster.getFollowers().get(3).getId();
+    cluster.killServer(killed);
+    LOG.info(cluster.printServers());
+
+    final SimpleMessage[] messages = SimpleMessage.create(10);
+    try(final RaftClient client = cluster.createClient("client", null)) {
+      for (SimpleMessage message : messages) {
+        client.send(message);
+      }
+    }
+
+    Thread.sleep(cluster.getMaxTimeout() + 100);
+    LOG.info(cluster.printAllLogs());
+
+    cluster.getServers().stream().filter(RaftServerImpl::isAlive)
+        .map(s -> s.getState().getLog().getEntries(1, Long.MAX_VALUE))
+        .forEach(e -> RaftTestUtil.assertLogEntries(e, 1, term, messages));
+  }
+
+  @Test
+  public void testEnforceLeader() throws Exception {
+    LOG.info("Running testEnforceLeader");
+    final String leader = "s" + ThreadLocalRandom.current().nextInt(NUM_SERVERS);
+    LOG.info("enforce leader to " + leader);
+    final MiniRaftCluster cluster = getCluster();
+    waitForLeader(cluster);
+    waitForLeader(cluster, leader);
+  }
+
+  static class Client4TestWithLoad extends Thread {
+    final RaftClient client;
+    final SimpleMessage[] messages;
+
+    final AtomicInteger step = new AtomicInteger();
+    volatile Exception exceptionInClientThread;
+
+    Client4TestWithLoad(RaftClient client, int numMessages) {
+      this.client = client;
+      this.messages = SimpleMessage.create(numMessages, client.getId());
+    }
+
+    boolean isRunning() {
+      return step.get() < messages.length && exceptionInClientThread == null;
+    }
+
+    @Override
+    public void run() {
+      try {
+        for (; isRunning(); ) {
+          client.send(messages[step.getAndIncrement()]);
+        }
+        client.close();
+      } catch (IOException ioe) {
+        exceptionInClientThread = ioe;
+      }
+    }
+  }
+
+  @Test
+  public void testWithLoad() throws Exception {
+    testWithLoad(10, 500);
+  }
+
+  private void testWithLoad(final int numClients, final int numMessages)
+      throws Exception {
+    LOG.info("Running testWithLoad: numClients=" + numClients
+        + ", numMessages=" + numMessages);
+
+    final MiniRaftCluster cluster = getCluster();
+    LOG.info(cluster.printServers());
+
+    final List<Client4TestWithLoad> clients
+        = Stream.iterate(0, i -> i+1).limit(numClients)
+        .map(i -> cluster.createClient(String.valueOf((char)('a' + i)), null))
+        .map(c -> new Client4TestWithLoad(c, numMessages))
+        .collect(Collectors.toList());
+    clients.forEach(Thread::start);
+
+    int count = 0;
+    for(int lastStep = 0;; ) {
+      if (clients.stream().filter(Client4TestWithLoad::isRunning).count() == 0) {
+        break;
+      }
+
+      final int n = clients.stream().mapToInt(c -> c.step.get()).sum();
+      if (n - lastStep < 50 * numClients) { // Change leader at least 50 steps.
+        Thread.sleep(10);
+        continue;
+      }
+      lastStep = n;
+      count++;
+
+      RaftServerImpl leader = cluster.getLeader();
+      if (leader != null) {
+        final String oldLeader = leader.getId();
+        LOG.info("Block all requests sent by leader " + oldLeader);
+        String newLeader = RaftTestUtil.changeLeader(cluster, oldLeader);
+        LOG.info("Changed leader from " + oldLeader + " to " + newLeader);
+        Assert.assertFalse(newLeader.equals(oldLeader));
+      }
+    }
+
+    for(Client4TestWithLoad c : clients) {
+      c.join();
+    }
+    for(Client4TestWithLoad c : clients) {
+      if (c.exceptionInClientThread != null) {
+        throw new AssertionError(c.exceptionInClientThread);
+      }
+      RaftTestUtil.assertLogEntries(cluster.getServers(), c.messages);
+    }
+
+    LOG.info("Leader change count=" + count + cluster.printAllLogs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
new file mode 100644
index 0000000..6d25835
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.client.impl.RaftClientImpl;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.simulation.RequestHandler;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.*;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+public abstract class RaftNotLeaderExceptionBaseTest {
+  static {
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(RaftNotLeaderExceptionBaseTest.class);
+  public static final int NUM_PEERS = 3;
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(60 * 1000);
+
+  private MiniRaftCluster cluster;
+
+  public abstract MiniRaftCluster initCluster() throws IOException;
+
+  @Before
+  public void setup() throws IOException {
+    this.cluster = initCluster();
+    cluster.start();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testHandleNotLeaderException() throws Exception {
+    RaftTestUtil.waitForLeader(cluster);
+    final String leaderId = cluster.getLeader().getId();
+    final RaftClient client = cluster.createClient("client", leaderId);
+
+    RaftClientReply reply = client.send(new SimpleMessage("m1"));
+    Assert.assertTrue(reply.isSuccess());
+
+    // enforce leader change
+    String newLeader = RaftTestUtil.changeLeader(cluster, leaderId);
+    Assert.assertNotEquals(leaderId, newLeader);
+
+    RaftClientRequestSender rpc = ((RaftClientImpl)client).getRequestSender();
+    reply= null;
+    for (int i = 0; reply == null && i < 10; i++) {
+      try {
+        reply = rpc.sendRequest(
+            new RaftClientRequest("client", leaderId, DEFAULT_SEQNUM,
+                new SimpleMessage("m2")));
+      } catch (IOException ignored) {
+        Thread.sleep(1000);
+      }
+    }
+    Assert.assertNotNull(reply);
+    Assert.assertFalse(reply.isSuccess());
+    Assert.assertTrue(reply.isNotLeader());
+    Assert.assertEquals(newLeader,
+        reply.getNotLeaderException().getSuggestedLeader().getId());
+
+    reply = client.send(new SimpleMessage("m3"));
+    Assert.assertTrue(reply.isSuccess());
+    client.close();
+  }
+
+  @Test
+  public void testNotLeaderExceptionWithReconf() throws Exception {
+    Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster));
+
+    final String leaderId = cluster.getLeader().getId();
+    final RaftClient client = cluster.createClient("client", leaderId);
+
+    // enforce leader change
+    String newLeader = RaftTestUtil.changeLeader(cluster, leaderId);
+    Assert.assertNotEquals(leaderId, newLeader);
+
+    // also add two new peers
+    // add two more peers
+    MiniRaftCluster.PeerChanges change = cluster.addNewPeers(
+        new String[]{"ss1", "ss2"}, true);
+    // trigger setConfiguration
+    LOG.info("Start changing the configuration: {}",
+        Arrays.asList(change.allPeersInNewConf));
+    try(final RaftClient c2 = cluster.createClient("client2", newLeader)) {
+      RaftClientReply reply = c2.setConfiguration(change.allPeersInNewConf);
+      Assert.assertTrue(reply.isSuccess());
+    }
+    LOG.info(cluster.printServers());
+
+    RaftClientRequestSender rpc = ((RaftClientImpl)client).getRequestSender();
+    RaftClientReply reply = null;
+    // it is possible that the remote peer's rpc server is not ready. need retry
+    for (int i = 0; reply == null && i < 10; i++) {
+      try {
+        reply = rpc.sendRequest(
+            new RaftClientRequest("client", leaderId, DEFAULT_SEQNUM,
+                new SimpleMessage("m1")));
+      } catch (IOException ignored) {
+        Thread.sleep(1000);
+      }
+    }
+    Assert.assertNotNull(reply);
+    Assert.assertFalse(reply.isSuccess());
+    Assert.assertTrue(reply.isNotLeader());
+    Assert.assertEquals(newLeader,
+        reply.getNotLeaderException().getSuggestedLeader().getId());
+    Collection<RaftPeer> peers = cluster.getPeers();
+    RaftPeer[] peersFromReply = reply.getNotLeaderException().getPeers();
+    Assert.assertEquals(peers.size(), peersFromReply.length);
+    for (RaftPeer p : peersFromReply) {
+      Assert.assertTrue(peers.contains(p));
+    }
+
+    reply = client.send(new SimpleMessage("m2"));
+    Assert.assertTrue(reply.isSuccess());
+    client.close();
+  }
+}