You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/01/31 21:17:20 UTC

[35/54] [abbrv] incubator-ratis git commit: Renamed the packages from raft to ratis in preperation for Apache Incubation - Moved all java packages from org.apache.raft to org.apache.ratis. - Moved native package to org_apache_ratis, and native lib to l

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java b/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java
deleted file mode 100644
index 293e1a4..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.commons.io.Charsets;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.server.impl.ConfigurationManager;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.server.storage.RaftStorageDirectory.LogPathAndIndex;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.AutoCloseableLock;
-import org.apache.raft.util.CodeInjectionForTesting;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT;
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY;
-
-/**
- * The RaftLog implementation that writes log entries into segmented files in
- * local disk.
- *
- * The max log segment size is 8MB. The real log segment size may not be
- * exactly equal to this limit. If a log entry's size exceeds 8MB, this entry
- * will be stored in a single segment.
- *
- * There are two types of segments: closed segment and open segment. The former
- * is named as "log_startindex-endindex", the later is named as
- * "log_inprogress_startindex".
- *
- * There can be multiple closed segments but there is at most one open segment.
- * When the open segment reaches the size limit, or the log term increases, we
- * close the open segment and start a new open segment. A closed segment cannot
- * be appended anymore, but it can be truncated in case that a follower's log is
- * inconsistent with the current leader.
- *
- * Every closed segment should be non-empty, i.e., it should contain at least
- * one entry.
- *
- * There should not be any gap between segments. The first segment may not start
- * from index 0 since there may be snapshots as log compaction. The last index
- * in segments should be no smaller than the last index of snapshot, otherwise
- * we may have hole when append further log.
- */
-public class SegmentedRaftLog extends RaftLog {
-  static final String HEADER_STR = "RAFTLOG1";
-  static final byte[] HEADER_BYTES = HEADER_STR.getBytes(Charsets.UTF_8);
-
-  /**
-   * I/O task definitions.
-   */
-  static abstract class Task {
-    private boolean done = false;
-
-    synchronized void done() {
-      done = true;
-      notifyAll();
-    }
-
-    synchronized void waitForDone() throws InterruptedException {
-      while (!done) {
-        wait();
-      }
-    }
-
-    abstract void execute() throws IOException;
-
-    abstract long getEndIndex();
-
-    @Override
-    public String toString() {
-      return getClass().getSimpleName() + "-" + getEndIndex();
-    }
-  }
-  private static final ThreadLocal<Task> myTask = new ThreadLocal<>();
-
-  private final RaftStorage storage;
-  private final RaftLogCache cache;
-  private final RaftLogWorker fileLogWorker;
-  private final long segmentMaxSize;
-
-  public SegmentedRaftLog(String selfId, RaftServerImpl server, RaftStorage storage,
-                          long lastIndexInSnapshot, RaftProperties properties) throws IOException {
-    super(selfId);
-    this.storage = storage;
-    this.segmentMaxSize = properties.getLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY,
-        RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT);
-    cache = new RaftLogCache();
-    fileLogWorker = new RaftLogWorker(server, storage, properties);
-    lastCommitted.set(lastIndexInSnapshot);
-  }
-
-  @Override
-  public void open(ConfigurationManager confManager, long lastIndexInSnapshot)
-      throws IOException {
-    loadLogSegments(confManager, lastIndexInSnapshot);
-    File openSegmentFile = null;
-    if (cache.getOpenSegment() != null) {
-      openSegmentFile = storage.getStorageDir()
-          .getOpenLogFile(cache.getOpenSegment().getStartIndex());
-    }
-    fileLogWorker.start(Math.max(cache.getEndIndex(), lastIndexInSnapshot),
-        openSegmentFile);
-    super.open(confManager, lastIndexInSnapshot);
-  }
-
-  @Override
-  public long getStartIndex() {
-    return cache.getStartIndex();
-  }
-
-  private void loadLogSegments(ConfigurationManager confManager,
-      long lastIndexInSnapshot) throws IOException {
-    try(AutoCloseableLock writeLock = writeLock()) {
-      List<LogPathAndIndex> paths = storage.getStorageDir().getLogSegmentFiles();
-      for (LogPathAndIndex pi : paths) {
-        LogSegment logSegment = parseLogSegment(pi, confManager);
-        cache.addSegment(logSegment);
-      }
-
-      // if the largest index is smaller than the last index in snapshot, we do
-      // not load the log to avoid holes between log segments. This may happen
-      // when the local I/O worker is too slow to persist log (slower than
-      // committing the log and taking snapshot)
-      if (!cache.isEmpty() && cache.getEndIndex() < lastIndexInSnapshot) {
-        LOG.warn("End log index {} is smaller than last index in snapshot {}",
-            cache.getEndIndex(), lastIndexInSnapshot);
-        cache.clear();
-        // TODO purge all segment files
-      }
-    }
-  }
-
-  private LogSegment parseLogSegment(LogPathAndIndex pi,
-      ConfigurationManager confManager) throws IOException {
-    final boolean isOpen = pi.endIndex == RaftServerConstants.INVALID_LOG_INDEX;
-    return LogSegment.loadSegment(pi.path.toFile(), pi.startIndex, pi.endIndex,
-        isOpen, confManager);
-  }
-
-  @Override
-  public LogEntryProto get(long index) {
-    checkLogState();
-    try(AutoCloseableLock readLock = readLock()) {
-      return cache.getEntry(index);
-    }
-  }
-
-  @Override
-  public LogEntryProto[] getEntries(long startIndex, long endIndex) {
-    checkLogState();
-    try(AutoCloseableLock readLock = readLock()) {
-      return cache.getEntries(startIndex, endIndex);
-    }
-  }
-
-  @Override
-  public LogEntryProto getLastEntry() {
-    checkLogState();
-    try(AutoCloseableLock readLock = readLock()) {
-      return cache.getLastEntry();
-    }
-  }
-
-  /**
-   * The method, along with {@link #appendEntry} and
-   * {@link #append(LogEntryProto...)} need protection of RaftServer's lock.
-   */
-  @Override
-  void truncate(long index) {
-    checkLogState();
-    try(AutoCloseableLock writeLock = writeLock()) {
-      RaftLogCache.TruncationSegments ts = cache.truncate(index);
-      if (ts != null) {
-        Task task = fileLogWorker.truncate(ts);
-        myTask.set(task);
-      }
-    }
-  }
-
-  @Override
-  void appendEntry(LogEntryProto entry) {
-    checkLogState();
-    try(AutoCloseableLock writeLock = writeLock()) {
-      final LogSegment currentOpenSegment = cache.getOpenSegment();
-      if (currentOpenSegment == null) {
-        cache.addSegment(LogSegment.newOpenSegment(entry.getIndex()));
-        fileLogWorker.startLogSegment(getNextIndex());
-      } else if (isSegmentFull(currentOpenSegment, entry)) {
-        cache.rollOpenSegment(true);
-        fileLogWorker.rollLogSegment(currentOpenSegment);
-      } else if (currentOpenSegment.numOfEntries() > 0 &&
-          currentOpenSegment.getLastRecord().entry.getTerm() != entry.getTerm()) {
-        // the term changes
-        final long currentTerm = currentOpenSegment.getLastRecord().entry
-            .getTerm();
-        Preconditions.checkState(currentTerm < entry.getTerm(),
-            "open segment's term %s is larger than the new entry's term %s",
-            currentTerm, entry.getTerm());
-        cache.rollOpenSegment(true);
-        fileLogWorker.rollLogSegment(currentOpenSegment);
-      }
-
-      cache.appendEntry(entry);
-      myTask.set(fileLogWorker.writeLogEntry(entry));
-    }
-  }
-
-  private boolean isSegmentFull(LogSegment segment, LogEntryProto entry) {
-    if (segment.getTotalSize() >= segmentMaxSize) {
-      return true;
-    } else {
-      final long entrySize = LogSegment.getEntrySize(entry);
-      // if entry size is greater than the max segment size, write it directly
-      // into the current segment
-      return entrySize <= segmentMaxSize &&
-          segment.getTotalSize() + entrySize > segmentMaxSize;
-    }
-  }
-
-  @Override
-  public void append(LogEntryProto... entries) {
-    checkLogState();
-    if (entries == null || entries.length == 0) {
-      return;
-    }
-    try(AutoCloseableLock writeLock = writeLock()) {
-      Iterator<LogEntryProto> iter = cache.iterator(entries[0].getIndex());
-      int index = 0;
-      long truncateIndex = -1;
-      for (; iter.hasNext() && index < entries.length; index++) {
-        LogEntryProto storedEntry = iter.next();
-        Preconditions.checkState(
-            storedEntry.getIndex() == entries[index].getIndex(),
-            "The stored entry's index %s is not consistent with" +
-                " the received entries[%s]'s index %s", storedEntry.getIndex(),
-            index, entries[index].getIndex());
-
-        if (storedEntry.getTerm() != entries[index].getTerm()) {
-          // we should truncate from the storedEntry's index
-          truncateIndex = storedEntry.getIndex();
-          break;
-        }
-      }
-      if (truncateIndex != -1) {
-        // truncate from truncateIndex
-        truncate(truncateIndex);
-      }
-      // append from entries[index]
-      for (int i = index; i < entries.length; i++) {
-        appendEntry(entries[i]);
-      }
-    }
-  }
-
-  @Override
-  public void logSync() throws InterruptedException {
-    CodeInjectionForTesting.execute(LOG_SYNC, getSelfId(), null);
-    final Task task = myTask.get();
-    if (task != null) {
-      task.waitForDone();
-    }
-  }
-
-  @Override
-  public long getLatestFlushedIndex() {
-    return fileLogWorker.getFlushedIndex();
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * This operation is protected by the RaftServer's lock
-   */
-  @Override
-  public void writeMetadata(long term, String votedFor) throws IOException {
-    storage.getMetaFile().set(term, votedFor);
-  }
-
-  @Override
-  public Metadata loadMetadata() throws IOException {
-    return new Metadata(storage.getMetaFile().getVotedFor(),
-        storage.getMetaFile().getTerm());
-  }
-
-  @Override
-  public void syncWithSnapshot(long lastSnapshotIndex) {
-    fileLogWorker.syncWithSnapshot(lastSnapshotIndex);
-    // TODO purge log files and normal/tmp/corrupt snapshot files
-    // if the last index in snapshot is larger than the index of the last
-    // log entry, we should delete all the log entries and their cache to avoid
-    // gaps between log segments.
-  }
-
-  @Override
-  public void close() throws IOException {
-    super.close();
-    fileLogWorker.close();
-    storage.close();
-  }
-
-  @VisibleForTesting
-  RaftLogCache getRaftLogCache() {
-    return cache;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/SnapshotManager.java b/raft-server/src/main/java/org/apache/raft/server/storage/SnapshotManager.java
deleted file mode 100644
index 8ab2833..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/SnapshotManager.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.io.MD5Hash;
-import org.apache.raft.shaded.proto.RaftProtos.FileChunkProto;
-import org.apache.raft.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.util.FileUtils;
-import org.apache.raft.util.MD5FileUtil;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-
-/**
- * Manage snapshots of a raft peer.
- * TODO: snapshot should be treated as compaction log thus can be merged into
- *       RaftLog. In this way we can have a unified getLastTermIndex interface.
- */
-public class SnapshotManager {
-  private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class);
-
-  private final RaftStorage storage;
-  private final String selfId;
-
-  public SnapshotManager(RaftStorage storage, String selfId)
-      throws IOException {
-    this.storage = storage;
-    this.selfId = selfId;
-  }
-
-  public void installSnapshot(StateMachine stateMachine,
-      InstallSnapshotRequestProto request) throws IOException {
-    final long lastIncludedIndex = request.getTermIndex().getIndex();
-    final RaftStorageDirectory dir = storage.getStorageDir();
-
-    File tmpDir = dir.getNewTempDir();
-    tmpDir.mkdirs();
-    tmpDir.deleteOnExit();
-
-    LOG.info("Installing snapshot:{}, to tmp dir:{}", request, tmpDir);
-
-    // TODO: Make sure that subsequent requests for the same installSnapshot are coming in order,
-    // and are not lost when whole request cycle is done. Check requestId and requestIndex here
-
-    for (FileChunkProto chunk : request.getFileChunksList()) {
-      SnapshotInfo pi = stateMachine.getLatestSnapshot();
-      if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) {
-        throw new IOException("There exists snapshot file "
-            + pi.getFiles() + " in " + selfId
-            + " with endIndex >= lastIncludedIndex " + lastIncludedIndex);
-      }
-
-      String fileName = chunk.getFilename(); // this is relative to the root dir
-      // TODO: assumes flat layout inside SM dir
-      File tmpSnapshotFile = new File(tmpDir,
-          new File(dir.getRoot(), fileName).getName());
-
-      FileOutputStream out = null;
-      try {
-        // if offset is 0, delete any existing temp snapshot file if it has the
-        // same last index.
-        if (chunk.getOffset() == 0) {
-          if (tmpSnapshotFile.exists()) {
-            FileUtils.fullyDelete(tmpSnapshotFile);
-          }
-          // create the temp snapshot file and put padding inside
-          out = new FileOutputStream(tmpSnapshotFile);
-        } else {
-          Preconditions.checkState(tmpSnapshotFile.exists());
-          out = new FileOutputStream(tmpSnapshotFile, true);
-          FileChannel fc = out.getChannel();
-          fc.position(chunk.getOffset());
-        }
-
-        // write data to the file
-        out.write(chunk.getData().toByteArray());
-      } finally {
-        RaftUtils.cleanup(null, out);
-      }
-
-      // rename the temp snapshot file if this is the last chunk. also verify
-      // the md5 digest and create the md5 meta-file.
-      if (chunk.getDone()) {
-        final MD5Hash expectedDigest =
-            new MD5Hash(chunk.getFileDigest().toByteArray());
-        // calculate the checksum of the snapshot file and compare it with the
-        // file digest in the request
-        MD5Hash digest = MD5FileUtil.computeMd5ForFile(tmpSnapshotFile);
-        if (!digest.equals(expectedDigest)) {
-          LOG.warn("The snapshot md5 digest {} does not match expected {}",
-              digest, expectedDigest);
-          // rename the temp snapshot file to .corrupt
-//          NativeIO.renameTo(tmpSnapshotFile, // TODO:
-//              dir.getCorruptSnapshotFile(lastIncludedTerm, lastIncludedIndex));
-          throw new IOException("MD5 mismatch for snapshot-" + lastIncludedIndex
-              + " installation");
-        } else {
-          MD5FileUtil.saveMD5File(tmpSnapshotFile, digest);
-        }
-      }
-    }
-
-    if (request.getDone()) {
-      LOG.info("Install snapshot is done, renaming tnp dir:{} to:{}",
-          tmpDir, dir.getStateMachineDir());
-      dir.getStateMachineDir().delete();
-      tmpDir.renameTo(dir.getStateMachineDir());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java b/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java
deleted file mode 100644
index ccc52c7..0000000
--- a/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.raft.statemachine;
-
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.Message;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.server.impl.RaftConfiguration;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.server.storage.RaftStorage;
-import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto;
-import org.apache.raft.util.LifeCycle;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Base implementation for StateMachines.
- */
-public class BaseStateMachine implements StateMachine {
-
-  protected RaftProperties properties;
-  protected RaftStorage storage;
-  protected RaftConfiguration raftConf;
-  protected final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName());
-
-  @Override
-  public LifeCycle.State getLifeCycleState() {
-    return lifeCycle.getCurrentState();
-  }
-
-  @Override
-  public void initialize(String id, RaftProperties properties, RaftStorage storage)
-      throws IOException {
-    lifeCycle.setName(getClass().getSimpleName() + ":" + id);
-    this.properties = properties;
-    this.storage = storage;
-  }
-
-  @Override
-  public void setRaftConfiguration(RaftConfiguration conf) {
-    this.raftConf = conf;
-  }
-
-  @Override
-  public RaftConfiguration getRaftConfiguration() {
-    return this.raftConf;
-  }
-
-  @Override
-  public SnapshotInfo getLatestSnapshot() {
-    return getStateMachineStorage().getLatestSnapshot();
-  }
-
-  @Override
-  public void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException {
-    // do nothing
-  }
-
-  public void pause() {
-  }
-
-  @Override
-  public void reinitialize(String id, RaftProperties properties, RaftStorage storage)
-      throws IOException {
-  }
-
-  @Override
-  public TransactionContext applyTransactionSerial(TransactionContext trx) throws IOException {
-    return trx;
-  }
-
-  @Override
-  public CompletableFuture<Message> applyTransaction(TransactionContext trx) throws IOException {
-    // return the same message contained in the entry
-    Message msg = () -> trx.getLogEntry().get().getSmLogEntry().getData();
-    return CompletableFuture.completedFuture(msg);
-  }
-
-  @Override
-  public long takeSnapshot() throws IOException {
-    return RaftServerConstants.INVALID_LOG_INDEX;
-  }
-
-  @Override
-  public StateMachineStorage getStateMachineStorage() {
-    return new StateMachineStorage() {
-      @Override
-      public void init(RaftStorage raftStorage) throws IOException {
-      }
-
-      @Override
-      public SnapshotInfo getLatestSnapshot() {
-        return null;
-      }
-
-      @Override
-      public void format() throws IOException {
-      }
-    };
-  }
-
-  @Override
-  public CompletableFuture<RaftClientReply> query(
-      RaftClientRequest request) {
-    return null;
-  }
-
-  @Override
-  public TransactionContext startTransaction(RaftClientRequest request)
-      throws IOException {
-    return new TransactionContext(this, request,
-        SMLogEntryProto.newBuilder()
-            .setData(request.getMessage().getContent())
-            .build());
-  }
-
-  @Override
-  public TransactionContext cancelTransaction(TransactionContext trx) throws IOException {
-    return trx;
-  }
-
-  @Override
-  public TransactionContext preAppendTransaction(TransactionContext trx) throws IOException {
-    return trx;
-  }
-
-  @Override
-  public void close() throws IOException {
-    // do nothing
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/FileListSnapshotInfo.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/FileListSnapshotInfo.java b/raft-server/src/main/java/org/apache/raft/statemachine/FileListSnapshotInfo.java
deleted file mode 100644
index b65fc13..0000000
--- a/raft-server/src/main/java/org/apache/raft/statemachine/FileListSnapshotInfo.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.FileInfo;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Each snapshot has a list of files.
- *
- * The objects of this class are immutable.
- */
-public class FileListSnapshotInfo implements SnapshotInfo {
-  private final TermIndex termIndex;
-  private final List<FileInfo> files;
-
-  public FileListSnapshotInfo(List<FileInfo> files, long term, long index) {
-    this.termIndex = TermIndex.newTermIndex(term, index);
-    this.files = Collections.unmodifiableList(files);
-  }
-
-  @Override
-  public TermIndex getTermIndex() {
-    return termIndex;
-  }
-
-  @Override
-  public long getTerm() {
-    return termIndex.getTerm();
-  }
-
-  @Override
-  public long getIndex() {
-    return termIndex.getIndex();
-  }
-
-  @Override
-  public List<FileInfo> getFiles() {
-    return files;
-  }
-
-  @Override
-  public String toString() {
-    return termIndex + ":" + files;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java b/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java
deleted file mode 100644
index a779f98..0000000
--- a/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.raft.io.MD5Hash;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.FileInfo;
-import org.apache.raft.server.storage.RaftStorage;
-import org.apache.raft.util.AtomicFileOutputStream;
-import org.apache.raft.util.MD5FileUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * A StateMachineStorage that stores the snapshot in a single file.
- */
-public class SimpleStateMachineStorage implements StateMachineStorage {
-
-  private static final Logger LOG = LoggerFactory.getLogger(SimpleStateMachineStorage.class);
-
-  static final String SNAPSHOT_FILE_PREFIX = "snapshot";
-  static final String CORRUPT_SNAPSHOT_FILE_SUFFIX = ".corrupt";
-  /** snapshot.term_index */
-  static final Pattern SNAPSHOT_REGEX =
-      Pattern.compile(SNAPSHOT_FILE_PREFIX + "\\.(\\d+)_(\\d+)");
-
-  private RaftStorage raftStorage;
-  private File smDir = null;
-
-  private volatile SingleFileSnapshotInfo currentSnapshot = null;
-
-  public void init(RaftStorage raftStorage) throws IOException {
-    this.raftStorage = raftStorage;
-    this.smDir = raftStorage.getStorageDir().getStateMachineDir();
-    loadLatestSnapshot();
-  }
-
-  @Override
-  public void format() throws IOException {
-    // TODO
-  }
-
-  @VisibleForTesting
-  public static TermIndex getTermIndexFromSnapshotFile(File file) {
-    final String name = file.getName();
-    final Matcher m = SNAPSHOT_REGEX.matcher(name);
-    if (!m.matches()) {
-      throw new IllegalArgumentException("File \"" + file
-          + "\" does not match snapshot file name pattern \""
-          + SNAPSHOT_REGEX + "\"");
-    }
-    final long term = Long.parseLong(m.group(1));
-    final long index = Long.parseLong(m.group(2));
-    return TermIndex.newTermIndex(term, index);
-  }
-
-  protected static String getTmpSnapshotFileName(long term, long endIndex) {
-    return getSnapshotFileName(term, endIndex) + AtomicFileOutputStream.TMP_EXTENSION;
-  }
-
-  protected static String getCorruptSnapshotFileName(long term, long endIndex) {
-    return getSnapshotFileName(term, endIndex) + CORRUPT_SNAPSHOT_FILE_SUFFIX;
-  }
-
-  public File getSnapshotFile(long term, long endIndex) {
-    return new File(smDir, getSnapshotFileName(term, endIndex));
-  }
-
-  protected File getTmpSnapshotFile(long term, long endIndex) {
-    return new File(smDir, getTmpSnapshotFileName(term, endIndex));
-  }
-
-  protected File getCorruptSnapshotFile(long term, long endIndex) {
-    return new File(smDir, getCorruptSnapshotFileName(term, endIndex));
-  }
-
-  public SingleFileSnapshotInfo findLatestSnapshot() throws IOException {
-    SingleFileSnapshotInfo latest = null;
-    try (DirectoryStream<Path> stream =
-             Files.newDirectoryStream(smDir.toPath())) {
-      for (Path path : stream) {
-        Matcher matcher = SNAPSHOT_REGEX.matcher(path.getFileName().toString());
-        if (matcher.matches()) {
-          final long endIndex = Long.parseLong(matcher.group(2));
-          if (latest == null || endIndex > latest.getIndex()) {
-            final long term = Long.parseLong(matcher.group(1));
-            MD5Hash fileDigest = MD5FileUtil.readStoredMd5ForFile(path.toFile());
-            final FileInfo fileInfo = new FileInfo(path, fileDigest);
-            latest = new SingleFileSnapshotInfo(fileInfo, term, endIndex);
-          }
-        }
-      }
-    }
-    return latest;
-  }
-
-  public void loadLatestSnapshot() throws IOException {
-    this.currentSnapshot = findLatestSnapshot();
-  }
-
-  public static String getSnapshotFileName(long term, long endIndex) {
-    return SNAPSHOT_FILE_PREFIX + "." + term + "_" + endIndex;
-  }
-
-  @Override
-  public SingleFileSnapshotInfo getLatestSnapshot() {
-    return currentSnapshot;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/SingleFileSnapshotInfo.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/SingleFileSnapshotInfo.java b/raft-server/src/main/java/org/apache/raft/statemachine/SingleFileSnapshotInfo.java
deleted file mode 100644
index 6b01e17..0000000
--- a/raft-server/src/main/java/org/apache/raft/statemachine/SingleFileSnapshotInfo.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import org.apache.raft.server.storage.FileInfo;
-
-import java.util.Arrays;
-
-/**
- * Each snapshot only has a single file.
- *
- * The objects of this class are immutable.
- */
-public class SingleFileSnapshotInfo extends FileListSnapshotInfo {
-  public SingleFileSnapshotInfo(FileInfo fileInfo, long term, long endIndex) {
-    super(Arrays.asList(fileInfo), term, endIndex);
-  }
-
-  /** @return the file associated with the snapshot. */
-  public FileInfo getFile() {
-    return getFiles().get(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/SnapshotInfo.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/SnapshotInfo.java b/raft-server/src/main/java/org/apache/raft/statemachine/SnapshotInfo.java
deleted file mode 100644
index 0fdcbc3..0000000
--- a/raft-server/src/main/java/org/apache/raft/statemachine/SnapshotInfo.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.FileInfo;
-
-import java.util.List;
-
-/**
- * SnapshotInfo represents a durable state by the state machine. The state machine implementation is
- * responsible for the layout of the snapshot files as well as making the data durable. Latest term,
- * latest index, and the raft configuration must be saved together with any data files in the
- * snapshot.
- */
-public interface SnapshotInfo {
-
-  /**
-   * Returns the term and index corresponding to this snapshot.
-   * @return The term and index corresponding to this snapshot.
-   */
-  TermIndex getTermIndex();
-
-  /**
-   * Returns the term corresponding to this snapshot.
-   * @return The term corresponding to this snapshot.
-   */
-  long getTerm();
-
-  /**
-   * Returns the index corresponding to this snapshot.
-   * @return The index corresponding to this snapshot.
-   */
-  long getIndex();
-
-  /**
-   * Returns a list of files corresponding to this snapshot. This list should include all
-   * the files that the state machine keeps in its data directory. This list of files will be
-   * copied as to other replicas in install snapshot RPCs.
-   * @return a list of Files corresponding to the this snapshot.
-   */
-  List<FileInfo> getFiles();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/StateMachine.java b/raft-server/src/main/java/org/apache/raft/statemachine/StateMachine.java
deleted file mode 100644
index 3dedf88..0000000
--- a/raft-server/src/main/java/org/apache/raft/statemachine/StateMachine.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.Message;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.server.RaftServerConfigKeys;
-import org.apache.raft.server.impl.RaftConfiguration;
-import org.apache.raft.server.storage.RaftStorage;
-import org.apache.raft.util.LifeCycle;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * StateMachine is the entry point for the custom implementation of replicated state as defined in
- * the "State Machine Approach" in the literature
- * (see https://en.wikipedia.org/wiki/State_machine_replication).
- */
-public interface StateMachine extends Closeable {
-  /**
-   * Initializes the State Machine with the given properties and storage. The state machine is
-   * responsible reading the latest snapshot from the file system (if any) and initialize itself
-   * with the latest term and index there including all the edits.
-   */
-  void initialize(String id, RaftProperties properties, RaftStorage storage)
-      throws IOException;
-
-  /**
-   * Returns the lifecycle state for this StateMachine.
-   * @return the lifecycle state.
-   */
-  LifeCycle.State getLifeCycleState();
-
-  /**
-   * Pauses the state machine. On return, the state machine should have closed all open files so
-   * that a new snapshot can be installed.
-   */
-  void pause();
-
-  /**
-   * Re-initializes the State Machine in PAUSED state with the given properties and storage. The
-   * state machine is responsible reading the latest snapshot from the file system (if any) and
-   * initialize itself with the latest term and index there including all the edits.
-   */
-  void reinitialize(String id, RaftProperties properties, RaftStorage storage)
-      throws IOException;
-
-  /**
-   * Dump the in-memory state into a snapshot file in the RaftStorage. The
-   * StateMachine implementation can decide 1) its own snapshot format, 2) when
-   * a snapshot is taken, and 3) how the snapshot is taken (e.g., whether the
-   * snapshot blocks the state machine, and whether to purge log entries after
-   * a snapshot is done).
-   *
-   * In the meanwhile, when the size of raft log outside of the latest snapshot
-   * exceeds certain threshold, the RaftServer may choose to trigger a snapshot
-   * if {@link RaftServerConfigKeys#RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY} is
-   * enabled.
-   *
-   * The snapshot should include the latest raft configuration.
-   *
-   * @return the largest index of the log entry that has been applied to the
-   *         state machine and also included in the snapshot. Note the log purge
-   *         should be handled separately.
-   */
-  // TODO: refactor this
-  long takeSnapshot() throws IOException;
-
-  /**
-   * Record the RaftConfiguration in the state machine. The RaftConfiguration
-   * should also be stored in the snapshot.
-   */
-  void setRaftConfiguration(RaftConfiguration conf);
-
-  /**
-   * @return the latest raft configuration recorded in the state machine.
-   */
-  RaftConfiguration getRaftConfiguration();
-
-  /**
-   * @return StateMachineStorage to interact with the durability guarantees provided by the
-   * state machine.
-   */
-  StateMachineStorage getStateMachineStorage();
-
-  /**
-   * Returns the information for the latest durable snapshot.
-   */
-  SnapshotInfo getLatestSnapshot();
-
-  /**
-   * Query the state machine. The request must be read-only.
-   * TODO: extend RaftClientRequest to have a read-only request subclass.
-   */
-  CompletableFuture<RaftClientReply> query(RaftClientRequest request);
-
-  /**
-   * Validate/pre-process the incoming update request in the state machine.
-   * @return the content to be written to the log entry. Null means the request
-   * should be rejected.
-   * @throws IOException thrown by the state machine while validation
-   */
-  TransactionContext startTransaction(RaftClientRequest request)
-      throws IOException;
-
-  /**
-   * This is called before the transaction passed from the StateMachine is appended to the raft log.
-   * This method will be called from log append and having the same strict serial order that the
-   * transactions will have in the RAFT log. Since this is called serially in the critical path of
-   * log append, it is important to do only required operations here.
-   * @return The Transaction context.
-   */
-  TransactionContext preAppendTransaction(TransactionContext trx) throws IOException;
-
-  /**
-   * Called to notify the state machine that the Transaction passed cannot be appended (or synced).
-   * The exception field will indicate whether there was an exception or not.
-   * @param trx the transaction to cancel
-   * @return cancelled transaction
-   */
-  TransactionContext cancelTransaction(TransactionContext trx) throws IOException;
-
-  /**
-   * Called for transactions that have been committed to the RAFT log. This step is called
-   * sequentially in strict serial order that the transactions have been committed in the log.
-   * The SM is expected to do only necessary work, and leave the actual apply operation to the
-   * applyTransaction calls that can happen concurrently.
-   * @param trx the transaction state including the log entry that has been committed to a quorum
-   *            of the raft peers
-   * @return The Transaction context.
-   */
-  TransactionContext applyTransactionSerial(TransactionContext trx) throws IOException;
-
-  /**
-   * Apply a committed log entry to the state machine. This method can be called concurrently with
-   * the other calls, and there is no guarantee that the calls will be ordered according to the
-   * log commit order.
-   * @param trx the transaction state including the log entry that has been committed to a quorum
-   *            of the raft peers
-   */
-  // TODO: We do not need to return CompletableFuture
-  CompletableFuture<Message> applyTransaction(TransactionContext trx) throws IOException;
-
-  /**
-   * Notify the state machine that the raft peer is no longer leader.
-   */
-  void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/StateMachineStorage.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/StateMachineStorage.java b/raft-server/src/main/java/org/apache/raft/statemachine/StateMachineStorage.java
deleted file mode 100644
index 30005f9..0000000
--- a/raft-server/src/main/java/org/apache/raft/statemachine/StateMachineStorage.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import org.apache.raft.server.storage.RaftStorage;
-import java.io.IOException;
-
-public interface StateMachineStorage {
-
-  void init(RaftStorage raftStorage) throws IOException;
-
-  /**
-   * Returns the information for the latest durable snapshot.
-   */
-  SnapshotInfo getLatestSnapshot();
-
-  // TODO: StateMachine can decide to compact the files independently of concurrent install snapshot
-  // etc requests. We should have ref counting for the SnapshotInfo with a release mechanism
-  // so that raft server will release the files after the snapshot file copy in case a compaction
-  // is waiting for deleting these files.
-
-  void format() throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/TransactionContext.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/TransactionContext.java b/raft-server/src/main/java/org/apache/raft/statemachine/TransactionContext.java
deleted file mode 100644
index 675ada9..0000000
--- a/raft-server/src/main/java/org/apache/raft/statemachine/TransactionContext.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Optional;
-
-/**
- * Context for a transaction.
- * The transaction might have originated from a client request, or it
- * maybe coming from another replica of the state machine through the RAFT log.
- * {@link TransactionContext} can be created from
- * either the {@link StateMachine} or the state machine updater.
- *
- * In the first case, the {@link StateMachine} is a leader. When it receives
- * a {@link StateMachine#startTransaction(RaftClientRequest)} request, it returns
- * a {@link TransactionContext} with the changes from the {@link StateMachine}.
- * The same context will be passed back to the {@link StateMachine}
- * via the {@link StateMachine#applyTransaction(TransactionContext)} call
- * or the {@link StateMachine#notifyNotLeader(Collection)} call.
- *
- * In the second case, the {@link StateMachine} is a follower.
- * The {@link TransactionContext} will be a committed entry coming from
- * the RAFT log from the leader.
- */
-public class TransactionContext {
-
-  /** The {@link StateMachine} that originated the transaction. */
-  private final StateMachine stateMachine;
-
-  /** Original request from the client */
-  private Optional<RaftClientRequest> clientRequest = Optional.empty();
-
-  /** Exception from the {@link StateMachine} or from the log */
-  private Optional<Exception> exception = Optional.empty();
-
-  /** Data from the {@link StateMachine} */
-  private Optional<SMLogEntryProto> smLogEntryProto = Optional.empty();
-
-  /**
-   * Context specific to the state machine.
-   * The {@link StateMachine} can use this object to carry state between
-   * {@link StateMachine#startTransaction(RaftClientRequest)} and
-   * {@link StateMachine#applyTransaction(TransactionContext)}.
-   */
-  private Optional<Object> stateMachineContext = Optional.empty();
-
-  /**
-   * Whether to commit the transaction to the RAFT Log.
-   * In some cases the {@link StateMachine} may want to indicate
-   * that the transaction should not be committed
-   */
-  private boolean shouldCommit = true;
-
-  /** Committed LogEntry. */
-  private Optional<LogEntryProto> logEntry = Optional.empty();
-
-  private TransactionContext(StateMachine stateMachine) {
-    this.stateMachine = stateMachine;
-  }
-
-  /** The same as this(stateMachine, clientRequest, smLogEntryProto, null). */
-  public TransactionContext(
-      StateMachine stateMachine, RaftClientRequest clientRequest,
-      SMLogEntryProto smLogEntryProto) {
-    this(stateMachine, clientRequest, smLogEntryProto, null);
-  }
-
-  /**
-   * Construct a {@link TransactionContext} from a client request.
-   * Used by the state machine to start a transaction
-   * and send the Log entry representing the transaction data
-   * to be applied to the raft log.
-   */
-  public TransactionContext(
-      StateMachine stateMachine, RaftClientRequest clientRequest,
-      SMLogEntryProto smLogEntryProto, Object stateMachineContext) {
-    this(stateMachine);
-    this.clientRequest = Optional.of(clientRequest);
-    this.smLogEntryProto = Optional.ofNullable(smLogEntryProto);
-    this.stateMachineContext = Optional.ofNullable(stateMachineContext);
-  }
-
-  /** The same as this(stateMachine, clientRequest, exception, null). */
-  public TransactionContext(
-      StateMachine stateMachine, RaftClientRequest clientRequest,
-      Exception exception) {
-    this(stateMachine, clientRequest, exception, null);
-  }
-
-  /**
-   * Construct a {@link TransactionContext} from a client request to signal
-   * an exception so that the RAFT server will fail the request on behalf
-   * of the {@link StateMachine}.
-   */
-  public TransactionContext(
-      StateMachine stateMachine, RaftClientRequest clientRequest,
-      Exception exception, Object stateMachineContext) {
-    this(stateMachine);
-    this.clientRequest = Optional.of(clientRequest);
-    this.exception = Optional.of(exception);
-    this.stateMachineContext = Optional.ofNullable(stateMachineContext);
-  }
-
-  /**
-   * Construct a {@link TransactionContext} from a {@link LogEntryProto}.
-   * Used by followers for applying committed entries to the state machine.
-   * @param logEntry the log entry to be applied
-   */
-  public TransactionContext(StateMachine stateMachine, LogEntryProto logEntry) {
-    this(stateMachine);
-    this.smLogEntryProto = Optional.of(logEntry.getSmLogEntry());
-    this.logEntry = Optional.of(logEntry);
-  }
-
-  public Optional<RaftClientRequest> getClientRequest() {
-    return this.clientRequest;
-  }
-
-  public Optional<SMLogEntryProto> getSMLogEntry() {
-    return this.smLogEntryProto;
-  }
-
-  public Optional<Exception> getException() {
-    return this.exception;
-  }
-
-  public TransactionContext setStateMachineContext(Object stateMachineContext) {
-    this.stateMachineContext = Optional.ofNullable(stateMachineContext);
-    return this;
-  }
-
-  public Optional<Object> getStateMachineContext() {
-    return stateMachineContext;
-  }
-
-  public TransactionContext setLogEntry(LogEntryProto logEntry) {
-    this.logEntry = Optional.of(logEntry);
-    return this;
-  }
-
-  public TransactionContext setSmLogEntryProto(SMLogEntryProto smLogEntryProto) {
-    this.smLogEntryProto = Optional.of(smLogEntryProto);
-    return this;
-  }
-
-  public Optional<LogEntryProto> getLogEntry() {
-    return logEntry;
-  }
-
-  private TransactionContext setException(IOException ioe) {
-    assert !this.exception.isPresent();
-    this.exception = Optional.of(ioe);
-    return this;
-  }
-
-  public TransactionContext setShouldCommit(boolean shouldCommit) {
-    this.shouldCommit = shouldCommit;
-    return this;
-  }
-
-  public boolean shouldCommit() {
-    // TODO: Hook this up in the server to bypass the RAFT Log and send back a response to client
-    return this.shouldCommit;
-  }
-
-  // proxy StateMachine methods. We do not want to expose the SM to the RaftLog
-
-  /**
-   * This is called before the transaction passed from the StateMachine is appended to the raft log.
-   * This method will be called from log append and having the same strict serial order that the
-   * Transactions will have in the RAFT log. Since this is called serially in the critical path of
-   * log append, it is important to do only required operations here.
-   * @return The Transaction context.
-   */
-  public TransactionContext preAppendTransaction() throws IOException {
-    return stateMachine.preAppendTransaction(this);
-  }
-
-  /**
-   * Called to notify the state machine that the Transaction passed cannot be appended (or synced).
-   * The exception field will indicate whether there was an exception or not.
-   * @return cancelled transaction
-   */
-  public TransactionContext cancelTransaction() throws IOException {
-    // TODO: This is not called from Raft server / log yet. When an IOException happens, we should
-    // call this to let the SM know that Transaction cannot be synced
-    return stateMachine.cancelTransaction(this);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java b/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java
deleted file mode 100644
index c66ef8f..0000000
--- a/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java
+++ /dev/null
@@ -1,444 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.client.RaftClient;
-import org.apache.raft.client.RaftClientRequestSender;
-import org.apache.raft.client.impl.RaftClientImpl;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.server.RaftServerConfigKeys;
-import org.apache.raft.server.impl.DelayLocalExecutionInjection;
-import org.apache.raft.server.impl.RaftConfiguration;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.apache.raft.server.RaftServerRpc;
-import org.apache.raft.server.storage.MemoryRaftLog;
-import org.apache.raft.server.storage.RaftLog;
-import org.apache.raft.statemachine.BaseStateMachine;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.util.ExitUtils;
-import org.apache.raft.util.FileUtils;
-import org.apache.raft.util.RaftUtils;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.stream.Collectors;
-
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT;
-
-public abstract class MiniRaftCluster {
-  public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class);
-  public static final DelayLocalExecutionInjection logSyncDelay =
-      new DelayLocalExecutionInjection(RaftLog.LOG_SYNC);
-
-  public static final String CLASS_NAME = MiniRaftCluster.class.getSimpleName();
-  public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class";
-  public static final Class<? extends StateMachine> STATEMACHINE_CLASS_DEFAULT = BaseStateMachine.class;
-
-  public static abstract class Factory<CLUSTER extends MiniRaftCluster> {
-    public abstract CLUSTER newCluster(
-        String[] ids, RaftProperties prop, boolean formatted)
-        throws IOException;
-
-    public CLUSTER newCluster(
-        int numServer, RaftProperties prop, boolean formatted)
-        throws IOException {
-      return newCluster(generateIds(numServer, 0), prop, formatted);
-    }
-  }
-
-  public static abstract class RpcBase extends MiniRaftCluster {
-    public RpcBase(String[] ids, RaftProperties properties, boolean formatted) {
-      super(ids, properties, formatted);
-    }
-
-    protected abstract RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException;
-
-    @Override
-    protected void setPeerRpc() throws IOException {
-      for (RaftPeer p : conf.getPeers()) {
-        setPeerRpc(p);
-      }
-    }
-
-    @Override
-    public void restartServer(String id, boolean format) throws IOException {
-      super.restartServer(id, format);
-      setPeerRpc(conf.getPeer(id)).start();
-    }
-
-    @Override
-    public void setBlockRequestsFrom(String src, boolean block) {
-      RaftTestUtil.setBlockRequestsFrom(src, block);
-    }
-  }
-
-  public static class PeerChanges {
-    public final RaftPeer[] allPeersInNewConf;
-    public final RaftPeer[] newPeers;
-    public final RaftPeer[] removedPeers;
-
-    public PeerChanges(RaftPeer[] all, RaftPeer[] newPeers, RaftPeer[] removed) {
-      this.allPeersInNewConf = all;
-      this.newPeers = newPeers;
-      this.removedPeers = removed;
-    }
-  }
-
-  public static RaftConfiguration initConfiguration(String[] ids) {
-    return RaftConfiguration.newBuilder()
-        .setConf(Arrays.stream(ids).map(RaftPeer::new).collect(Collectors.toList()))
-        .build();
-  }
-
-  private static String getBaseDirectory() {
-    return System.getProperty("test.build.data", "target/test/data") + "/raft/";
-  }
-
-  private static void formatDir(String dirStr) {
-    final File serverDir = new File(dirStr);
-    Preconditions.checkState(FileUtils.fullyDelete(serverDir),
-        "Failed to format directory %s", dirStr);
-    LOG.info("Formatted directory {}", dirStr);
-  }
-
-  public static String[] generateIds(int numServers, int base) {
-    String[] ids = new String[numServers];
-    for (int i = 0; i < numServers; i++) {
-      ids[i] = "s" + (i + base);
-    }
-    return ids;
-  }
-
-  protected RaftConfiguration conf;
-  protected final RaftProperties properties;
-  private final String testBaseDir;
-  protected final Map<String, RaftServerImpl> servers =
-      Collections.synchronizedMap(new LinkedHashMap<>());
-
-  public MiniRaftCluster(String[] ids, RaftProperties properties,
-      boolean formatted) {
-    this.conf = initConfiguration(ids);
-    this.properties = new RaftProperties(properties);
-    this.testBaseDir = getBaseDirectory();
-
-    conf.getPeers().forEach(
-        p -> servers.put(p.getId(), newRaftServer(p.getId(), conf, formatted)));
-
-    ExitUtils.disableSystemExit();
-  }
-
-  protected <RPC extends  RaftServerRpc> void init(Map<RaftPeer, RPC> peers) {
-    LOG.info("peers = " + peers.keySet());
-    conf = RaftConfiguration.newBuilder().setConf(peers.keySet()).build();
-    for (Map.Entry<RaftPeer, RPC> entry : peers.entrySet()) {
-      final RaftServerImpl server = servers.get(entry.getKey().getId());
-      server.setInitialConf(conf);
-      server.setServerRpc(entry.getValue());
-    }
-  }
-
-  public void start() {
-    LOG.info("Starting " + getClass().getSimpleName());
-    servers.values().forEach(RaftServerImpl::start);
-  }
-
-  /**
-   * start a stopped server again.
-   */
-  public void restartServer(String id, boolean format) throws IOException {
-    killServer(id);
-    servers.remove(id);
-    servers.put(id, newRaftServer(id, conf, format));
-  }
-
-  public final void restart(boolean format) throws IOException {
-    servers.values().stream().filter(RaftServerImpl::isAlive)
-        .forEach(RaftServerImpl::close);
-    List<String> idList = new ArrayList<>(servers.keySet());
-    for (String id : idList) {
-      servers.remove(id);
-      servers.put(id, newRaftServer(id, conf, format));
-    }
-
-    setPeerRpc();
-    start();
-  }
-
-  protected abstract void setPeerRpc() throws IOException;
-
-  public int getMaxTimeout() {
-    return properties.getInt(
-        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY,
-        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT);
-  }
-
-  public RaftConfiguration getConf() {
-    return conf;
-  }
-
-  private RaftServerImpl newRaftServer(String id, RaftConfiguration conf,
-                                       boolean format) {
-    final RaftServerImpl s;
-    try {
-      final String dirStr = testBaseDir + id;
-      if (format) {
-        formatDir(dirStr);
-      }
-      properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, dirStr);
-      s = new RaftServerImpl(id, conf, properties, getStateMachine4Test(properties));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return s;
-  }
-
-  static StateMachine getStateMachine4Test(RaftProperties properties) {
-    final Class<? extends StateMachine> smClass = properties.getClass(
-        STATEMACHINE_CLASS_KEY,
-        STATEMACHINE_CLASS_DEFAULT,
-        StateMachine.class);
-    return RaftUtils.newInstance(smClass);
-  }
-
-  public abstract RaftClientRequestSender getRaftClientRequestSender();
-
-  protected <RPC extends RaftServerRpc> Collection<RaftPeer> addNewPeers(
-      Map<RaftPeer, RPC> newPeers, Collection<RaftServerImpl> newServers,
-      boolean startService) throws IOException {
-    for (Map.Entry<RaftPeer, RPC> entry : newPeers.entrySet()) {
-      RaftServerImpl server = servers.get(entry.getKey().getId());
-      server.setServerRpc(entry.getValue());
-    }
-    if (startService) {
-      newServers.forEach(RaftServerImpl::start);
-    }
-    return new ArrayList<>(newPeers.keySet());
-  }
-
-  protected abstract Collection<RaftPeer> addNewPeers(
-      Collection<RaftPeer> newPeers, Collection<RaftServerImpl> newServers,
-      boolean startService) throws IOException;
-
-  public PeerChanges addNewPeers(int number, boolean startNewPeer)
-      throws IOException {
-    return addNewPeers(generateIds(number, servers.size()), startNewPeer);
-  }
-
-  public PeerChanges addNewPeers(String[] ids,
-      boolean startNewPeer) throws IOException {
-    LOG.info("Add new peers {}", Arrays.asList(ids));
-    Collection<RaftPeer> newPeers = new ArrayList<>(ids.length);
-    for (String id : ids) {
-      newPeers.add(new RaftPeer(id));
-    }
-
-    // create and add new RaftServers
-    final List<RaftServerImpl> newServers = new ArrayList<>(ids.length);
-    for (RaftPeer p : newPeers) {
-      RaftServerImpl newServer = newRaftServer(p.getId(), conf, true);
-      Preconditions.checkArgument(!servers.containsKey(p.getId()));
-      servers.put(p.getId(), newServer);
-      newServers.add(newServer);
-    }
-
-    // for hadoop-rpc-enabled peer, we assign inetsocketaddress here
-    newPeers = addNewPeers(newPeers, newServers, startNewPeer);
-
-    final RaftPeer[] np = newPeers.toArray(new RaftPeer[newPeers.size()]);
-    newPeers.addAll(conf.getPeers());
-    conf = RaftConfiguration.newBuilder().setConf(newPeers).setLogEntryIndex(0).build();
-    RaftPeer[] p = newPeers.toArray(new RaftPeer[newPeers.size()]);
-    return new PeerChanges(p, np, new RaftPeer[0]);
-  }
-
-  public void startServer(String id) {
-    RaftServerImpl server = servers.get(id);
-    assert server != null;
-    server.start();
-  }
-
-  private RaftPeer getPeer(RaftServerImpl s) {
-    return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress());
-  }
-
-  /**
-   * prepare the peer list when removing some peers from the conf
-   */
-  public PeerChanges removePeers(int number, boolean removeLeader,
-      Collection<RaftPeer> excluded) {
-    Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers());
-    List<RaftPeer> removedPeers = new ArrayList<>(number);
-    if (removeLeader) {
-      final RaftPeer leader = getPeer(getLeader());
-      assert !excluded.contains(leader);
-      peers.remove(leader);
-      removedPeers.add(leader);
-    }
-    List<RaftServerImpl> followers = getFollowers();
-    for (int i = 0, removed = 0; i < followers.size() &&
-        removed < (removeLeader ? number - 1 : number); i++) {
-      RaftPeer toRemove = getPeer(followers.get(i));
-      if (!excluded.contains(toRemove)) {
-        peers.remove(toRemove);
-        removedPeers.add(toRemove);
-        removed++;
-      }
-    }
-    conf = RaftConfiguration.newBuilder().setConf(peers).setLogEntryIndex(0).build();
-    RaftPeer[] p = peers.toArray(new RaftPeer[peers.size()]);
-    return new PeerChanges(p, new RaftPeer[0],
-        removedPeers.toArray(new RaftPeer[removedPeers.size()]));
-  }
-
-  public void killServer(String id) {
-    servers.get(id).close();
-  }
-
-  public String printServers() {
-    StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n");
-    for (RaftServerImpl s : servers.values()) {
-      b.append("  ");
-      b.append(s).append("\n");
-    }
-    return b.toString();
-  }
-
-  public String printAllLogs() {
-    StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n");
-    for (RaftServerImpl s : servers.values()) {
-      b.append("  ");
-      b.append(s).append("\n");
-
-      final RaftLog log = s.getState().getLog();
-      if (log instanceof MemoryRaftLog) {
-        b.append("    ");
-        b.append(((MemoryRaftLog) log).getEntryString());
-      }
-    }
-    return b.toString();
-  }
-
-  public RaftServerImpl getLeader() {
-    final List<RaftServerImpl> leaders = new ArrayList<>();
-    servers.values().stream()
-        .filter(s -> s.isAlive() && s.isLeader())
-        .forEach(s -> {
-      if (leaders.isEmpty()) {
-        leaders.add(s);
-      } else {
-        final long leaderTerm = leaders.get(0).getState().getCurrentTerm();
-        final long term = s.getState().getCurrentTerm();
-        if (term >= leaderTerm) {
-          if (term > leaderTerm) {
-            leaders.clear();
-          }
-          leaders.add(s);
-        }
-      }
-    });
-    if (leaders.isEmpty()) {
-      return null;
-    } else if (leaders.size() != 1) {
-      Assert.fail(printServers() + leaders.toString()
-          + "leaders.size() = " + leaders.size() + " != 1");
-    }
-    return leaders.get(0);
-  }
-
-  public boolean isLeader(String leaderId) throws InterruptedException {
-    final RaftServerImpl leader = getLeader();
-    return leader != null && leader.getId().equals(leaderId);
-  }
-
-  public List<RaftServerImpl> getFollowers() {
-    return servers.values().stream()
-        .filter(s -> s.isAlive() && s.isFollower())
-        .collect(Collectors.toList());
-  }
-
-  public Collection<RaftServerImpl> getServers() {
-    return servers.values();
-  }
-
-  public RaftServerImpl getServer(String id) {
-    return servers.get(id);
-  }
-
-  public Collection<RaftPeer> getPeers() {
-    return getServers().stream().map(s ->
-        new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress()))
-        .collect(Collectors.toList());
-  }
-
-  public RaftClient createClient(String clientId, String leaderId) {
-    return new RaftClientImpl(clientId, conf.getPeers(),
-        getRaftClientRequestSender(), leaderId, properties);
-  }
-
-  public void shutdown() {
-    LOG.info("Stopping " + getClass().getSimpleName());
-    servers.values().stream().filter(RaftServerImpl::isAlive)
-        .forEach(RaftServerImpl::close);
-
-    if (ExitUtils.isTerminated()) {
-      LOG.error("Test resulted in an unexpected exit",
-          ExitUtils.getFirstExitException());
-      throw new AssertionError("Test resulted in an unexpected exit");
-    }
-  }
-
-  /**
-   * Block all the incoming requests for the peer with leaderId. Also delay
-   * outgoing or incoming msg for all other peers.
-   */
-  protected abstract void blockQueueAndSetDelay(String leaderId, int delayMs)
-      throws InterruptedException;
-
-  /**
-   * Try to enforce the leader of the cluster.
-   * @param leaderId ID of the targeted leader server.
-   * @return true if server has been successfully enforced to the leader, false
-   *         otherwise.
-   */
-  public boolean tryEnforceLeader(String leaderId) throws InterruptedException {
-    // do nothing and see if the given id is already a leader.
-    if (isLeader(leaderId)) {
-      return true;
-    }
-
-    // Blocking all other server's RPC read process to make sure a read takes at
-    // least ELECTION_TIMEOUT_MIN. In this way when the target leader request a
-    // vote, all non-leader servers can grant the vote.
-    // Disable the target leader server RPC so that it can request a vote.
-    blockQueueAndSetDelay(leaderId, RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT);
-
-    // Reopen queues so that the vote can make progress.
-    blockQueueAndSetDelay(leaderId, 0);
-
-    return isLeader(leaderId);
-  }
-
-  /** Block/unblock the requests sent from the given source. */
-  public abstract void setBlockRequestsFrom(String src, boolean block);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java b/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java
deleted file mode 100644
index ed40bde..0000000
--- a/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft;
-
-import org.apache.raft.RaftTestUtil.SimpleMessage;
-import org.apache.raft.client.RaftClient;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.junit.*;
-import org.junit.rules.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static org.apache.raft.RaftTestUtil.waitAndKillLeader;
-import static org.apache.raft.RaftTestUtil.waitForLeader;
-
-public abstract class RaftBasicTests {
-  public static final Logger LOG = LoggerFactory.getLogger(RaftBasicTests.class);
-
-  public static final int NUM_SERVERS = 5;
-
-  protected static final RaftProperties properties = new RaftProperties();
-
-  public abstract MiniRaftCluster getCluster();
-
-  public RaftProperties getProperties() {
-    return properties;
-  }
-
-  @Rule
-  public Timeout globalTimeout = new Timeout(120 * 1000);
-
-  @Before
-  public void setup() throws IOException {
-    Assert.assertNull(getCluster().getLeader());
-    getCluster().start();
-  }
-
-  @After
-  public void tearDown() {
-    final MiniRaftCluster cluster = getCluster();
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  @Test
-  public void testBasicLeaderElection() throws Exception {
-    LOG.info("Running testBasicLeaderElection");
-    final MiniRaftCluster cluster = getCluster();
-    waitAndKillLeader(cluster, true);
-    waitAndKillLeader(cluster, true);
-    waitAndKillLeader(cluster, true);
-    waitAndKillLeader(cluster, false);
-  }
-
-  @Test
-  public void testBasicAppendEntries() throws Exception {
-    LOG.info("Running testBasicAppendEntries");
-    final MiniRaftCluster cluster = getCluster();
-    RaftServerImpl leader = waitForLeader(cluster);
-    final long term = leader.getState().getCurrentTerm();
-    final String killed = cluster.getFollowers().get(3).getId();
-    cluster.killServer(killed);
-    LOG.info(cluster.printServers());
-
-    final SimpleMessage[] messages = SimpleMessage.create(10);
-    try(final RaftClient client = cluster.createClient("client", null)) {
-      for (SimpleMessage message : messages) {
-        client.send(message);
-      }
-    }
-
-    Thread.sleep(cluster.getMaxTimeout() + 100);
-    LOG.info(cluster.printAllLogs());
-
-    cluster.getServers().stream().filter(RaftServerImpl::isAlive)
-        .map(s -> s.getState().getLog().getEntries(1, Long.MAX_VALUE))
-        .forEach(e -> RaftTestUtil.assertLogEntries(e, 1, term, messages));
-  }
-
-  @Test
-  public void testEnforceLeader() throws Exception {
-    LOG.info("Running testEnforceLeader");
-    final String leader = "s" + ThreadLocalRandom.current().nextInt(NUM_SERVERS);
-    LOG.info("enforce leader to " + leader);
-    final MiniRaftCluster cluster = getCluster();
-    waitForLeader(cluster);
-    waitForLeader(cluster, leader);
-  }
-
-  static class Client4TestWithLoad extends Thread {
-    final RaftClient client;
-    final SimpleMessage[] messages;
-
-    final AtomicInteger step = new AtomicInteger();
-    volatile Exception exceptionInClientThread;
-
-    Client4TestWithLoad(RaftClient client, int numMessages) {
-      this.client = client;
-      this.messages = SimpleMessage.create(numMessages, client.getId());
-    }
-
-    boolean isRunning() {
-      return step.get() < messages.length && exceptionInClientThread == null;
-    }
-
-    @Override
-    public void run() {
-      try {
-        for (; isRunning(); ) {
-          client.send(messages[step.getAndIncrement()]);
-        }
-        client.close();
-      } catch (IOException ioe) {
-        exceptionInClientThread = ioe;
-      }
-    }
-  }
-
-  @Test
-  public void testWithLoad() throws Exception {
-    testWithLoad(10, 500);
-  }
-
-  private void testWithLoad(final int numClients, final int numMessages)
-      throws Exception {
-    LOG.info("Running testWithLoad: numClients=" + numClients
-        + ", numMessages=" + numMessages);
-
-    final MiniRaftCluster cluster = getCluster();
-    LOG.info(cluster.printServers());
-
-    final List<Client4TestWithLoad> clients
-        = Stream.iterate(0, i -> i+1).limit(numClients)
-        .map(i -> cluster.createClient(String.valueOf((char)('a' + i)), null))
-        .map(c -> new Client4TestWithLoad(c, numMessages))
-        .collect(Collectors.toList());
-    clients.forEach(Thread::start);
-
-    int count = 0;
-    for(int lastStep = 0;; ) {
-      if (clients.stream().filter(Client4TestWithLoad::isRunning).count() == 0) {
-        break;
-      }
-
-      final int n = clients.stream().mapToInt(c -> c.step.get()).sum();
-      if (n - lastStep < 50 * numClients) { // Change leader at least 50 steps.
-        Thread.sleep(10);
-        continue;
-      }
-      lastStep = n;
-      count++;
-
-      RaftServerImpl leader = cluster.getLeader();
-      if (leader != null) {
-        final String oldLeader = leader.getId();
-        LOG.info("Block all requests sent by leader " + oldLeader);
-        String newLeader = RaftTestUtil.changeLeader(cluster, oldLeader);
-        LOG.info("Changed leader from " + oldLeader + " to " + newLeader);
-        Assert.assertFalse(newLeader.equals(oldLeader));
-      }
-    }
-
-    for(Client4TestWithLoad c : clients) {
-      c.join();
-    }
-    for(Client4TestWithLoad c : clients) {
-      if (c.exceptionInClientThread != null) {
-        throw new AssertionError(c.exceptionInClientThread);
-      }
-      RaftTestUtil.assertLogEntries(cluster.getServers(), c.messages);
-    }
-
-    LOG.info("Leader change count=" + count + cluster.printAllLogs());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java b/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java
deleted file mode 100644
index 195cbec..0000000
--- a/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft;
-
-import org.apache.log4j.Level;
-import org.apache.raft.RaftTestUtil.SimpleMessage;
-import org.apache.raft.client.RaftClient;
-import org.apache.raft.client.RaftClientRequestSender;
-import org.apache.raft.client.impl.RaftClientImpl;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.apache.raft.server.simulation.RequestHandler;
-import org.apache.raft.server.storage.RaftLog;
-import org.apache.raft.util.RaftUtils;
-import org.junit.*;
-import org.junit.rules.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-import static org.apache.raft.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
-
-public abstract class RaftNotLeaderExceptionBaseTest {
-  static {
-    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
-    RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
-    RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG);
-    RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
-  }
-
-  public static final Logger LOG =
-      LoggerFactory.getLogger(RaftNotLeaderExceptionBaseTest.class);
-  public static final int NUM_PEERS = 3;
-
-  @Rule
-  public Timeout globalTimeout = new Timeout(60 * 1000);
-
-  private MiniRaftCluster cluster;
-
-  public abstract MiniRaftCluster initCluster() throws IOException;
-
-  @Before
-  public void setup() throws IOException {
-    this.cluster = initCluster();
-    cluster.start();
-  }
-
-  @After
-  public void tearDown() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  @Test
-  public void testHandleNotLeaderException() throws Exception {
-    RaftTestUtil.waitForLeader(cluster);
-    final String leaderId = cluster.getLeader().getId();
-    final RaftClient client = cluster.createClient("client", leaderId);
-
-    RaftClientReply reply = client.send(new SimpleMessage("m1"));
-    Assert.assertTrue(reply.isSuccess());
-
-    // enforce leader change
-    String newLeader = RaftTestUtil.changeLeader(cluster, leaderId);
-    Assert.assertNotEquals(leaderId, newLeader);
-
-    RaftClientRequestSender rpc = ((RaftClientImpl)client).getRequestSender();
-    reply= null;
-    for (int i = 0; reply == null && i < 10; i++) {
-      try {
-        reply = rpc.sendRequest(
-            new RaftClientRequest("client", leaderId, DEFAULT_SEQNUM,
-                new SimpleMessage("m2")));
-      } catch (IOException ignored) {
-        Thread.sleep(1000);
-      }
-    }
-    Assert.assertNotNull(reply);
-    Assert.assertFalse(reply.isSuccess());
-    Assert.assertTrue(reply.isNotLeader());
-    Assert.assertEquals(newLeader,
-        reply.getNotLeaderException().getSuggestedLeader().getId());
-
-    reply = client.send(new SimpleMessage("m3"));
-    Assert.assertTrue(reply.isSuccess());
-    client.close();
-  }
-
-  @Test
-  public void testNotLeaderExceptionWithReconf() throws Exception {
-    Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster));
-
-    final String leaderId = cluster.getLeader().getId();
-    final RaftClient client = cluster.createClient("client", leaderId);
-
-    // enforce leader change
-    String newLeader = RaftTestUtil.changeLeader(cluster, leaderId);
-    Assert.assertNotEquals(leaderId, newLeader);
-
-    // also add two new peers
-    // add two more peers
-    MiniRaftCluster.PeerChanges change = cluster.addNewPeers(
-        new String[]{"ss1", "ss2"}, true);
-    // trigger setConfiguration
-    LOG.info("Start changing the configuration: {}",
-        Arrays.asList(change.allPeersInNewConf));
-    try(final RaftClient c2 = cluster.createClient("client2", newLeader)) {
-      RaftClientReply reply = c2.setConfiguration(change.allPeersInNewConf);
-      Assert.assertTrue(reply.isSuccess());
-    }
-    LOG.info(cluster.printServers());
-
-    RaftClientRequestSender rpc = ((RaftClientImpl)client).getRequestSender();
-    RaftClientReply reply = null;
-    // it is possible that the remote peer's rpc server is not ready. need retry
-    for (int i = 0; reply == null && i < 10; i++) {
-      try {
-        reply = rpc.sendRequest(
-            new RaftClientRequest("client", leaderId, DEFAULT_SEQNUM,
-                new SimpleMessage("m1")));
-      } catch (IOException ignored) {
-        Thread.sleep(1000);
-      }
-    }
-    Assert.assertNotNull(reply);
-    Assert.assertFalse(reply.isSuccess());
-    Assert.assertTrue(reply.isNotLeader());
-    Assert.assertEquals(newLeader,
-        reply.getNotLeaderException().getSuggestedLeader().getId());
-    Collection<RaftPeer> peers = cluster.getPeers();
-    RaftPeer[] peersFromReply = reply.getNotLeaderException().getPeers();
-    Assert.assertEquals(peers.size(), peersFromReply.length);
-    for (RaftPeer p : peersFromReply) {
-      Assert.assertTrue(peers.contains(p));
-    }
-
-    reply = client.send(new SimpleMessage("m2"));
-    Assert.assertTrue(reply.isSuccess());
-    client.close();
-  }
-}