You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/01/31 21:17:03 UTC
[18/54] [abbrv] incubator-ratis git commit: Renamed the packages from
raft to ratis in preperation for Apache Incubation - Moved all java packages
from org.apache.raft to org.apache.ratis. - Moved native package to
org_apache_ratis, and native lib to l
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
new file mode 100644
index 0000000..9b2932c
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.io.Charsets;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.impl.ConfigurationManager;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.util.AutoCloseableLock;
+import org.apache.ratis.util.CodeInjectionForTesting;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * The RaftLog implementation that writes log entries into segmented files in
+ * local disk.
+ *
+ * The max log segment size is 8MB. The real log segment size may not be
+ * exactly equal to this limit. If a log entry's size exceeds 8MB, this entry
+ * will be stored in a single segment.
+ *
+ * There are two types of segments: closed segment and open segment. The former
+ * is named as "log_startindex-endindex", the later is named as
+ * "log_inprogress_startindex".
+ *
+ * There can be multiple closed segments but there is at most one open segment.
+ * When the open segment reaches the size limit, or the log term increases, we
+ * close the open segment and start a new open segment. A closed segment cannot
+ * be appended anymore, but it can be truncated in case that a follower's log is
+ * inconsistent with the current leader.
+ *
+ * Every closed segment should be non-empty, i.e., it should contain at least
+ * one entry.
+ *
+ * There should not be any gap between segments. The first segment may not start
+ * from index 0 since there may be snapshots as log compaction. The last index
+ * in segments should be no smaller than the last index of snapshot, otherwise
+ * we may have hole when append further log.
+ */
+public class SegmentedRaftLog extends RaftLog {
+ static final String HEADER_STR = "RAFTLOG1";
+ static final byte[] HEADER_BYTES = HEADER_STR.getBytes(Charsets.UTF_8);
+
+ /**
+ * I/O task definitions.
+ */
+ static abstract class Task {
+ private boolean done = false;
+
+ synchronized void done() {
+ done = true;
+ notifyAll();
+ }
+
+ synchronized void waitForDone() throws InterruptedException {
+ while (!done) {
+ wait();
+ }
+ }
+
+ abstract void execute() throws IOException;
+
+ abstract long getEndIndex();
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "-" + getEndIndex();
+ }
+ }
+ private static final ThreadLocal<Task> myTask = new ThreadLocal<>();
+
+ private final RaftStorage storage;
+ private final RaftLogCache cache;
+ private final RaftLogWorker fileLogWorker;
+ private final long segmentMaxSize;
+
+ public SegmentedRaftLog(String selfId, RaftServerImpl server, RaftStorage storage,
+ long lastIndexInSnapshot, RaftProperties properties) throws IOException {
+ super(selfId);
+ this.storage = storage;
+ this.segmentMaxSize = properties.getLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY,
+ RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT);
+ cache = new RaftLogCache();
+ fileLogWorker = new RaftLogWorker(server, storage, properties);
+ lastCommitted.set(lastIndexInSnapshot);
+ }
+
+ @Override
+ public void open(ConfigurationManager confManager, long lastIndexInSnapshot)
+ throws IOException {
+ loadLogSegments(confManager, lastIndexInSnapshot);
+ File openSegmentFile = null;
+ if (cache.getOpenSegment() != null) {
+ openSegmentFile = storage.getStorageDir()
+ .getOpenLogFile(cache.getOpenSegment().getStartIndex());
+ }
+ fileLogWorker.start(Math.max(cache.getEndIndex(), lastIndexInSnapshot),
+ openSegmentFile);
+ super.open(confManager, lastIndexInSnapshot);
+ }
+
+ @Override
+ public long getStartIndex() {
+ return cache.getStartIndex();
+ }
+
+ private void loadLogSegments(ConfigurationManager confManager,
+ long lastIndexInSnapshot) throws IOException {
+ try(AutoCloseableLock writeLock = writeLock()) {
+ List<LogPathAndIndex> paths = storage.getStorageDir().getLogSegmentFiles();
+ for (LogPathAndIndex pi : paths) {
+ LogSegment logSegment = parseLogSegment(pi, confManager);
+ cache.addSegment(logSegment);
+ }
+
+ // if the largest index is smaller than the last index in snapshot, we do
+ // not load the log to avoid holes between log segments. This may happen
+ // when the local I/O worker is too slow to persist log (slower than
+ // committing the log and taking snapshot)
+ if (!cache.isEmpty() && cache.getEndIndex() < lastIndexInSnapshot) {
+ LOG.warn("End log index {} is smaller than last index in snapshot {}",
+ cache.getEndIndex(), lastIndexInSnapshot);
+ cache.clear();
+ // TODO purge all segment files
+ }
+ }
+ }
+
+ private LogSegment parseLogSegment(LogPathAndIndex pi,
+ ConfigurationManager confManager) throws IOException {
+ final boolean isOpen = pi.endIndex == RaftServerConstants.INVALID_LOG_INDEX;
+ return LogSegment.loadSegment(pi.path.toFile(), pi.startIndex, pi.endIndex,
+ isOpen, confManager);
+ }
+
+ @Override
+ public LogEntryProto get(long index) {
+ checkLogState();
+ try(AutoCloseableLock readLock = readLock()) {
+ return cache.getEntry(index);
+ }
+ }
+
+ @Override
+ public LogEntryProto[] getEntries(long startIndex, long endIndex) {
+ checkLogState();
+ try(AutoCloseableLock readLock = readLock()) {
+ return cache.getEntries(startIndex, endIndex);
+ }
+ }
+
+ @Override
+ public LogEntryProto getLastEntry() {
+ checkLogState();
+ try(AutoCloseableLock readLock = readLock()) {
+ return cache.getLastEntry();
+ }
+ }
+
+ /**
+ * The method, along with {@link #appendEntry} and
+ * {@link #append(LogEntryProto...)} need protection of RaftServer's lock.
+ */
+ @Override
+ void truncate(long index) {
+ checkLogState();
+ try(AutoCloseableLock writeLock = writeLock()) {
+ RaftLogCache.TruncationSegments ts = cache.truncate(index);
+ if (ts != null) {
+ Task task = fileLogWorker.truncate(ts);
+ myTask.set(task);
+ }
+ }
+ }
+
+ @Override
+ void appendEntry(LogEntryProto entry) {
+ checkLogState();
+ try(AutoCloseableLock writeLock = writeLock()) {
+ final LogSegment currentOpenSegment = cache.getOpenSegment();
+ if (currentOpenSegment == null) {
+ cache.addSegment(LogSegment.newOpenSegment(entry.getIndex()));
+ fileLogWorker.startLogSegment(getNextIndex());
+ } else if (isSegmentFull(currentOpenSegment, entry)) {
+ cache.rollOpenSegment(true);
+ fileLogWorker.rollLogSegment(currentOpenSegment);
+ } else if (currentOpenSegment.numOfEntries() > 0 &&
+ currentOpenSegment.getLastRecord().entry.getTerm() != entry.getTerm()) {
+ // the term changes
+ final long currentTerm = currentOpenSegment.getLastRecord().entry
+ .getTerm();
+ Preconditions.checkState(currentTerm < entry.getTerm(),
+ "open segment's term %s is larger than the new entry's term %s",
+ currentTerm, entry.getTerm());
+ cache.rollOpenSegment(true);
+ fileLogWorker.rollLogSegment(currentOpenSegment);
+ }
+
+ cache.appendEntry(entry);
+ myTask.set(fileLogWorker.writeLogEntry(entry));
+ }
+ }
+
+ private boolean isSegmentFull(LogSegment segment, LogEntryProto entry) {
+ if (segment.getTotalSize() >= segmentMaxSize) {
+ return true;
+ } else {
+ final long entrySize = LogSegment.getEntrySize(entry);
+ // if entry size is greater than the max segment size, write it directly
+ // into the current segment
+ return entrySize <= segmentMaxSize &&
+ segment.getTotalSize() + entrySize > segmentMaxSize;
+ }
+ }
+
+ @Override
+ public void append(LogEntryProto... entries) {
+ checkLogState();
+ if (entries == null || entries.length == 0) {
+ return;
+ }
+ try(AutoCloseableLock writeLock = writeLock()) {
+ Iterator<LogEntryProto> iter = cache.iterator(entries[0].getIndex());
+ int index = 0;
+ long truncateIndex = -1;
+ for (; iter.hasNext() && index < entries.length; index++) {
+ LogEntryProto storedEntry = iter.next();
+ Preconditions.checkState(
+ storedEntry.getIndex() == entries[index].getIndex(),
+ "The stored entry's index %s is not consistent with" +
+ " the received entries[%s]'s index %s", storedEntry.getIndex(),
+ index, entries[index].getIndex());
+
+ if (storedEntry.getTerm() != entries[index].getTerm()) {
+ // we should truncate from the storedEntry's index
+ truncateIndex = storedEntry.getIndex();
+ break;
+ }
+ }
+ if (truncateIndex != -1) {
+ // truncate from truncateIndex
+ truncate(truncateIndex);
+ }
+ // append from entries[index]
+ for (int i = index; i < entries.length; i++) {
+ appendEntry(entries[i]);
+ }
+ }
+ }
+
+ @Override
+ public void logSync() throws InterruptedException {
+ CodeInjectionForTesting.execute(LOG_SYNC, getSelfId(), null);
+ final Task task = myTask.get();
+ if (task != null) {
+ task.waitForDone();
+ }
+ }
+
+ @Override
+ public long getLatestFlushedIndex() {
+ return fileLogWorker.getFlushedIndex();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * This operation is protected by the RaftServer's lock
+ */
+ @Override
+ public void writeMetadata(long term, String votedFor) throws IOException {
+ storage.getMetaFile().set(term, votedFor);
+ }
+
+ @Override
+ public Metadata loadMetadata() throws IOException {
+ return new Metadata(storage.getMetaFile().getVotedFor(),
+ storage.getMetaFile().getTerm());
+ }
+
+ @Override
+ public void syncWithSnapshot(long lastSnapshotIndex) {
+ fileLogWorker.syncWithSnapshot(lastSnapshotIndex);
+ // TODO purge log files and normal/tmp/corrupt snapshot files
+ // if the last index in snapshot is larger than the index of the last
+ // log entry, we should delete all the log entries and their cache to avoid
+ // gaps between log segments.
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ fileLogWorker.close();
+ storage.close();
+ }
+
+ @VisibleForTesting
+ RaftLogCache getRaftLogCache() {
+ return cache;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
new file mode 100644
index 0000000..73b2af9
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+
+import org.apache.ratis.io.MD5Hash;
+import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.MD5FileUtil;
+import org.apache.ratis.util.RaftUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Manage snapshots of a raft peer.
+ * TODO: snapshot should be treated as compaction log thus can be merged into
+ * RaftLog. In this way we can have a unified getLastTermIndex interface.
+ */
+public class SnapshotManager {
+ private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class);
+
+ private final RaftStorage storage;
+ private final String selfId;
+
+ public SnapshotManager(RaftStorage storage, String selfId)
+ throws IOException {
+ this.storage = storage;
+ this.selfId = selfId;
+ }
+
+ public void installSnapshot(StateMachine stateMachine,
+ InstallSnapshotRequestProto request) throws IOException {
+ final long lastIncludedIndex = request.getTermIndex().getIndex();
+ final RaftStorageDirectory dir = storage.getStorageDir();
+
+ File tmpDir = dir.getNewTempDir();
+ tmpDir.mkdirs();
+ tmpDir.deleteOnExit();
+
+ LOG.info("Installing snapshot:{}, to tmp dir:{}", request, tmpDir);
+
+ // TODO: Make sure that subsequent requests for the same installSnapshot are coming in order,
+ // and are not lost when whole request cycle is done. Check requestId and requestIndex here
+
+ for (FileChunkProto chunk : request.getFileChunksList()) {
+ SnapshotInfo pi = stateMachine.getLatestSnapshot();
+ if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) {
+ throw new IOException("There exists snapshot file "
+ + pi.getFiles() + " in " + selfId
+ + " with endIndex >= lastIncludedIndex " + lastIncludedIndex);
+ }
+
+ String fileName = chunk.getFilename(); // this is relative to the root dir
+ // TODO: assumes flat layout inside SM dir
+ File tmpSnapshotFile = new File(tmpDir,
+ new File(dir.getRoot(), fileName).getName());
+
+ FileOutputStream out = null;
+ try {
+ // if offset is 0, delete any existing temp snapshot file if it has the
+ // same last index.
+ if (chunk.getOffset() == 0) {
+ if (tmpSnapshotFile.exists()) {
+ FileUtils.fullyDelete(tmpSnapshotFile);
+ }
+ // create the temp snapshot file and put padding inside
+ out = new FileOutputStream(tmpSnapshotFile);
+ } else {
+ Preconditions.checkState(tmpSnapshotFile.exists());
+ out = new FileOutputStream(tmpSnapshotFile, true);
+ FileChannel fc = out.getChannel();
+ fc.position(chunk.getOffset());
+ }
+
+ // write data to the file
+ out.write(chunk.getData().toByteArray());
+ } finally {
+ RaftUtils.cleanup(null, out);
+ }
+
+ // rename the temp snapshot file if this is the last chunk. also verify
+ // the md5 digest and create the md5 meta-file.
+ if (chunk.getDone()) {
+ final MD5Hash expectedDigest =
+ new MD5Hash(chunk.getFileDigest().toByteArray());
+ // calculate the checksum of the snapshot file and compare it with the
+ // file digest in the request
+ MD5Hash digest = MD5FileUtil.computeMd5ForFile(tmpSnapshotFile);
+ if (!digest.equals(expectedDigest)) {
+ LOG.warn("The snapshot md5 digest {} does not match expected {}",
+ digest, expectedDigest);
+ // rename the temp snapshot file to .corrupt
+// NativeIO.renameTo(tmpSnapshotFile, // TODO:
+// dir.getCorruptSnapshotFile(lastIncludedTerm, lastIncludedIndex));
+ throw new IOException("MD5 mismatch for snapshot-" + lastIncludedIndex
+ + " installation");
+ } else {
+ MD5FileUtil.saveMD5File(tmpSnapshotFile, digest);
+ }
+ }
+ }
+
+ if (request.getDone()) {
+ LOG.info("Install snapshot is done, renaming tnp dir:{} to:{}",
+ tmpDir, dir.getStateMachineDir());
+ dir.getStateMachineDir().delete();
+ tmpDir.renameTo(dir.getStateMachineDir());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java
new file mode 100644
index 0000000..397a12b
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.statemachine;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
+import org.apache.ratis.util.LifeCycle;
+
+/**
+ * Base implementation for StateMachines.
+ */
+public class BaseStateMachine implements StateMachine {
+
+ protected RaftProperties properties;
+ protected RaftStorage storage;
+ protected RaftConfiguration raftConf;
+ protected final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName());
+
+ @Override
+ public LifeCycle.State getLifeCycleState() {
+ return lifeCycle.getCurrentState();
+ }
+
+ @Override
+ public void initialize(String id, RaftProperties properties, RaftStorage storage)
+ throws IOException {
+ lifeCycle.setName(getClass().getSimpleName() + ":" + id);
+ this.properties = properties;
+ this.storage = storage;
+ }
+
+ @Override
+ public void setRaftConfiguration(RaftConfiguration conf) {
+ this.raftConf = conf;
+ }
+
+ @Override
+ public RaftConfiguration getRaftConfiguration() {
+ return this.raftConf;
+ }
+
+ @Override
+ public SnapshotInfo getLatestSnapshot() {
+ return getStateMachineStorage().getLatestSnapshot();
+ }
+
+ @Override
+ public void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException {
+ // do nothing
+ }
+
+ @Override
+ public void pause() {
+ }
+
+ @Override
+ public void reinitialize(String id, RaftProperties properties, RaftStorage storage)
+ throws IOException {
+ }
+
+ @Override
+ public TransactionContext applyTransactionSerial(TransactionContext trx) throws IOException {
+ return trx;
+ }
+
+ @Override
+ public CompletableFuture<Message> applyTransaction(TransactionContext trx) throws IOException {
+ // return the same message contained in the entry
+ Message msg = () -> trx.getLogEntry().get().getSmLogEntry().getData();
+ return CompletableFuture.completedFuture(msg);
+ }
+
+ @Override
+ public long takeSnapshot() throws IOException {
+ return RaftServerConstants.INVALID_LOG_INDEX;
+ }
+
+ @Override
+ public StateMachineStorage getStateMachineStorage() {
+ return new StateMachineStorage() {
+ @Override
+ public void init(RaftStorage raftStorage) throws IOException {
+ }
+
+ @Override
+ public SnapshotInfo getLatestSnapshot() {
+ return null;
+ }
+
+ @Override
+ public void format() throws IOException {
+ }
+ };
+ }
+
+ @Override
+ public CompletableFuture<RaftClientReply> query(
+ RaftClientRequest request) {
+ return null;
+ }
+
+ @Override
+ public TransactionContext startTransaction(RaftClientRequest request)
+ throws IOException {
+ return new TransactionContext(this, request,
+ SMLogEntryProto.newBuilder()
+ .setData(request.getMessage().getContent())
+ .build());
+ }
+
+ @Override
+ public TransactionContext cancelTransaction(TransactionContext trx) throws IOException {
+ return trx;
+ }
+
+ @Override
+ public TransactionContext preAppendTransaction(TransactionContext trx) throws IOException {
+ return trx;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/FileListSnapshotInfo.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/FileListSnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/FileListSnapshotInfo.java
new file mode 100644
index 0000000..1858603
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/FileListSnapshotInfo.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.FileInfo;
+
+/**
+ * Each snapshot has a list of files.
+ *
+ * The objects of this class are immutable.
+ */
+public class FileListSnapshotInfo implements SnapshotInfo {
+ private final TermIndex termIndex;
+ private final List<FileInfo> files;
+
+ public FileListSnapshotInfo(List<FileInfo> files, long term, long index) {
+ this.termIndex = TermIndex.newTermIndex(term, index);
+ this.files = Collections.unmodifiableList(files);
+ }
+
+ @Override
+ public TermIndex getTermIndex() {
+ return termIndex;
+ }
+
+ @Override
+ public long getTerm() {
+ return termIndex.getTerm();
+ }
+
+ @Override
+ public long getIndex() {
+ return termIndex.getIndex();
+ }
+
+ @Override
+ public List<FileInfo> getFiles() {
+ return files;
+ }
+
+ @Override
+ public String toString() {
+ return termIndex + ":" + files;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java
new file mode 100644
index 0000000..d417db7
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.ratis.io.MD5Hash;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.util.AtomicFileOutputStream;
+import org.apache.ratis.util.MD5FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A StateMachineStorage that stores the snapshot in a single file.
+ */
+public class SimpleStateMachineStorage implements StateMachineStorage {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleStateMachineStorage.class);
+
+ static final String SNAPSHOT_FILE_PREFIX = "snapshot";
+ static final String CORRUPT_SNAPSHOT_FILE_SUFFIX = ".corrupt";
+ /** snapshot.term_index */
+ static final Pattern SNAPSHOT_REGEX =
+ Pattern.compile(SNAPSHOT_FILE_PREFIX + "\\.(\\d+)_(\\d+)");
+
+ private RaftStorage raftStorage;
+ private File smDir = null;
+
+ private volatile SingleFileSnapshotInfo currentSnapshot = null;
+
+ public void init(RaftStorage raftStorage) throws IOException {
+ this.raftStorage = raftStorage;
+ this.smDir = raftStorage.getStorageDir().getStateMachineDir();
+ loadLatestSnapshot();
+ }
+
+ @Override
+ public void format() throws IOException {
+ // TODO
+ }
+
+ @VisibleForTesting
+ public static TermIndex getTermIndexFromSnapshotFile(File file) {
+ final String name = file.getName();
+ final Matcher m = SNAPSHOT_REGEX.matcher(name);
+ if (!m.matches()) {
+ throw new IllegalArgumentException("File \"" + file
+ + "\" does not match snapshot file name pattern \""
+ + SNAPSHOT_REGEX + "\"");
+ }
+ final long term = Long.parseLong(m.group(1));
+ final long index = Long.parseLong(m.group(2));
+ return TermIndex.newTermIndex(term, index);
+ }
+
+ protected static String getTmpSnapshotFileName(long term, long endIndex) {
+ return getSnapshotFileName(term, endIndex) + AtomicFileOutputStream.TMP_EXTENSION;
+ }
+
+ protected static String getCorruptSnapshotFileName(long term, long endIndex) {
+ return getSnapshotFileName(term, endIndex) + CORRUPT_SNAPSHOT_FILE_SUFFIX;
+ }
+
+ public File getSnapshotFile(long term, long endIndex) {
+ return new File(smDir, getSnapshotFileName(term, endIndex));
+ }
+
+ protected File getTmpSnapshotFile(long term, long endIndex) {
+ return new File(smDir, getTmpSnapshotFileName(term, endIndex));
+ }
+
+ protected File getCorruptSnapshotFile(long term, long endIndex) {
+ return new File(smDir, getCorruptSnapshotFileName(term, endIndex));
+ }
+
+ public SingleFileSnapshotInfo findLatestSnapshot() throws IOException {
+ SingleFileSnapshotInfo latest = null;
+ try (DirectoryStream<Path> stream =
+ Files.newDirectoryStream(smDir.toPath())) {
+ for (Path path : stream) {
+ Matcher matcher = SNAPSHOT_REGEX.matcher(path.getFileName().toString());
+ if (matcher.matches()) {
+ final long endIndex = Long.parseLong(matcher.group(2));
+ if (latest == null || endIndex > latest.getIndex()) {
+ final long term = Long.parseLong(matcher.group(1));
+ MD5Hash fileDigest = MD5FileUtil.readStoredMd5ForFile(path.toFile());
+ final FileInfo fileInfo = new FileInfo(path, fileDigest);
+ latest = new SingleFileSnapshotInfo(fileInfo, term, endIndex);
+ }
+ }
+ }
+ }
+ return latest;
+ }
+
+ public void loadLatestSnapshot() throws IOException {
+ this.currentSnapshot = findLatestSnapshot();
+ }
+
+ public static String getSnapshotFileName(long term, long endIndex) {
+ return SNAPSHOT_FILE_PREFIX + "." + term + "_" + endIndex;
+ }
+
+ @Override
+ public SingleFileSnapshotInfo getLatestSnapshot() {
+ return currentSnapshot;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/SingleFileSnapshotInfo.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/SingleFileSnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/SingleFileSnapshotInfo.java
new file mode 100644
index 0000000..5bca2c9
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/SingleFileSnapshotInfo.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import java.util.Arrays;
+
+import org.apache.ratis.server.storage.FileInfo;
+
+/**
+ * Each snapshot only has a single file.
+ *
+ * The objects of this class are immutable.
+ */
+public class SingleFileSnapshotInfo extends FileListSnapshotInfo {
+ public SingleFileSnapshotInfo(FileInfo fileInfo, long term, long endIndex) {
+ super(Arrays.asList(fileInfo), term, endIndex);
+ }
+
+ /** @return the file associated with the snapshot. */
+ public FileInfo getFile() {
+ return getFiles().get(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java
new file mode 100644
index 0000000..f0aadd9
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import java.util.List;
+
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.FileInfo;
+
+/**
+ * SnapshotInfo represents a durable state by the state machine. The state machine implementation is
+ * responsible for the layout of the snapshot files as well as making the data durable. Latest term,
+ * latest index, and the raft configuration must be saved together with any data files in the
+ * snapshot.
+ */
+public interface SnapshotInfo {
+
+ /**
+ * Returns the term and index corresponding to this snapshot.
+ * @return The term and index corresponding to this snapshot.
+ */
+ TermIndex getTermIndex();
+
+ /**
+ * Returns the term corresponding to this snapshot.
+ * @return The term corresponding to this snapshot.
+ */
+ long getTerm();
+
+ /**
+ * Returns the index corresponding to this snapshot.
+ * @return The index corresponding to this snapshot.
+ */
+ long getIndex();
+
+ /**
+ * Returns a list of files corresponding to this snapshot. This list should include all
+ * the files that the state machine keeps in its data directory. This list of files will be
+ * copied as to other replicas in install snapshot RPCs.
+ * @return a list of Files corresponding to the this snapshot.
+ */
+ List<FileInfo> getFiles();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
new file mode 100644
index 0000000..e377aa7
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.util.LifeCycle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * StateMachine is the entry point for the custom implementation of replicated state as defined in
+ * the "State Machine Approach" in the literature
+ * (see https://en.wikipedia.org/wiki/State_machine_replication).
+ */
+public interface StateMachine extends Closeable {
+ /**
+ * Initializes the State Machine with the given properties and storage. The state machine is
+ * responsible reading the latest snapshot from the file system (if any) and initialize itself
+ * with the latest term and index there including all the edits.
+ */
+ void initialize(String id, RaftProperties properties, RaftStorage storage)
+ throws IOException;
+
+ /**
+ * Returns the lifecycle state for this StateMachine.
+ * @return the lifecycle state.
+ */
+ LifeCycle.State getLifeCycleState();
+
+ /**
+ * Pauses the state machine. On return, the state machine should have closed all open files so
+ * that a new snapshot can be installed.
+ */
+ void pause();
+
+ /**
+ * Re-initializes the State Machine in PAUSED state with the given properties and storage. The
+ * state machine is responsible reading the latest snapshot from the file system (if any) and
+ * initialize itself with the latest term and index there including all the edits.
+ */
+ void reinitialize(String id, RaftProperties properties, RaftStorage storage)
+ throws IOException;
+
+ /**
+ * Dump the in-memory state into a snapshot file in the RaftStorage. The
+ * StateMachine implementation can decide 1) its own snapshot format, 2) when
+ * a snapshot is taken, and 3) how the snapshot is taken (e.g., whether the
+ * snapshot blocks the state machine, and whether to purge log entries after
+ * a snapshot is done).
+ *
+ * In the meanwhile, when the size of raft log outside of the latest snapshot
+ * exceeds certain threshold, the RaftServer may choose to trigger a snapshot
+ * if {@link RaftServerConfigKeys#RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY} is
+ * enabled.
+ *
+ * The snapshot should include the latest raft configuration.
+ *
+ * @return the largest index of the log entry that has been applied to the
+ * state machine and also included in the snapshot. Note the log purge
+ * should be handled separately.
+ */
+ // TODO: refactor this
+ long takeSnapshot() throws IOException;
+
+ /**
+ * Record the RaftConfiguration in the state machine. The RaftConfiguration
+ * should also be stored in the snapshot.
+ */
+ void setRaftConfiguration(RaftConfiguration conf);
+
+ /**
+ * @return the latest raft configuration recorded in the state machine.
+ */
+ RaftConfiguration getRaftConfiguration();
+
+ /**
+ * @return StateMachineStorage to interact with the durability guarantees provided by the
+ * state machine.
+ */
+ StateMachineStorage getStateMachineStorage();
+
+ /**
+ * Returns the information for the latest durable snapshot.
+ */
+ SnapshotInfo getLatestSnapshot();
+
+ /**
+ * Query the state machine. The request must be read-only.
+ * TODO: extend RaftClientRequest to have a read-only request subclass.
+ */
+ CompletableFuture<RaftClientReply> query(RaftClientRequest request);
+
+ /**
+ * Validate/pre-process the incoming update request in the state machine.
+ * @return the content to be written to the log entry. Null means the request
+ * should be rejected.
+ * @throws IOException thrown by the state machine while validation
+ */
+ TransactionContext startTransaction(RaftClientRequest request)
+ throws IOException;
+
+ /**
+ * This is called before the transaction passed from the StateMachine is appended to the raft log.
+ * This method will be called from log append and having the same strict serial order that the
+ * transactions will have in the RAFT log. Since this is called serially in the critical path of
+ * log append, it is important to do only required operations here.
+ * @return The Transaction context.
+ */
+ TransactionContext preAppendTransaction(TransactionContext trx) throws IOException;
+
+ /**
+ * Called to notify the state machine that the Transaction passed cannot be appended (or synced).
+ * The exception field will indicate whether there was an exception or not.
+ * @param trx the transaction to cancel
+ * @return cancelled transaction
+ */
+ TransactionContext cancelTransaction(TransactionContext trx) throws IOException;
+
+ /**
+ * Called for transactions that have been committed to the RAFT log. This step is called
+ * sequentially in strict serial order that the transactions have been committed in the log.
+ * The SM is expected to do only necessary work, and leave the actual apply operation to the
+ * applyTransaction calls that can happen concurrently.
+ * @param trx the transaction state including the log entry that has been committed to a quorum
+ * of the raft peers
+ * @return The Transaction context.
+ */
+ TransactionContext applyTransactionSerial(TransactionContext trx) throws IOException;
+
+ /**
+ * Apply a committed log entry to the state machine. This method can be called concurrently with
+ * the other calls, and there is no guarantee that the calls will be ordered according to the
+ * log commit order.
+ * @param trx the transaction state including the log entry that has been committed to a quorum
+ * of the raft peers
+ */
+ // TODO: We do not need to return CompletableFuture
+ CompletableFuture<Message> applyTransaction(TransactionContext trx) throws IOException;
+
+ /**
+ * Notify the state machine that the raft peer is no longer leader.
+ */
+ void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
new file mode 100644
index 0000000..4f7951a
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import java.io.IOException;
+
+import org.apache.ratis.server.storage.RaftStorage;
+
+public interface StateMachineStorage {
+
+ void init(RaftStorage raftStorage) throws IOException;
+
+ /**
+ * Returns the information for the latest durable snapshot.
+ */
+ SnapshotInfo getLatestSnapshot();
+
+ // TODO: StateMachine can decide to compact the files independently of concurrent install snapshot
+ // etc requests. We should have ref counting for the SnapshotInfo with a release mechanism
+ // so that raft server will release the files after the snapshot file copy in case a compaction
+ // is waiting for deleting these files.
+
+ void format() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
new file mode 100644
index 0000000..81bea45
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Optional;
+
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
+
+/**
+ * Context for a transaction.
+ * The transaction might have originated from a client request, or it
+ * maybe coming from another replica of the state machine through the RAFT log.
+ * {@link TransactionContext} can be created from
+ * either the {@link StateMachine} or the state machine updater.
+ *
+ * In the first case, the {@link StateMachine} is a leader. When it receives
+ * a {@link StateMachine#startTransaction(RaftClientRequest)} request, it returns
+ * a {@link TransactionContext} with the changes from the {@link StateMachine}.
+ * The same context will be passed back to the {@link StateMachine}
+ * via the {@link StateMachine#applyTransaction(TransactionContext)} call
+ * or the {@link StateMachine#notifyNotLeader(Collection)} call.
+ *
+ * In the second case, the {@link StateMachine} is a follower.
+ * The {@link TransactionContext} will be a committed entry coming from
+ * the RAFT log from the leader.
+ */
+public class TransactionContext {
+
+ /** The {@link StateMachine} that originated the transaction. */
+ private final StateMachine stateMachine;
+
+ /** Original request from the client */
+ private Optional<RaftClientRequest> clientRequest = Optional.empty();
+
+ /** Exception from the {@link StateMachine} or from the log */
+ private Optional<Exception> exception = Optional.empty();
+
+ /** Data from the {@link StateMachine} */
+ private Optional<SMLogEntryProto> smLogEntryProto = Optional.empty();
+
+ /**
+ * Context specific to the state machine.
+ * The {@link StateMachine} can use this object to carry state between
+ * {@link StateMachine#startTransaction(RaftClientRequest)} and
+ * {@link StateMachine#applyTransaction(TransactionContext)}.
+ */
+ private Optional<Object> stateMachineContext = Optional.empty();
+
+ /**
+ * Whether to commit the transaction to the RAFT Log.
+ * In some cases the {@link StateMachine} may want to indicate
+ * that the transaction should not be committed
+ */
+ private boolean shouldCommit = true;
+
+ /** Committed LogEntry. */
+ private Optional<LogEntryProto> logEntry = Optional.empty();
+
+ private TransactionContext(StateMachine stateMachine) {
+ this.stateMachine = stateMachine;
+ }
+
+ /** The same as this(stateMachine, clientRequest, smLogEntryProto, null). */
+ public TransactionContext(
+ StateMachine stateMachine, RaftClientRequest clientRequest,
+ SMLogEntryProto smLogEntryProto) {
+ this(stateMachine, clientRequest, smLogEntryProto, null);
+ }
+
+ /**
+ * Construct a {@link TransactionContext} from a client request.
+ * Used by the state machine to start a transaction
+ * and send the Log entry representing the transaction data
+ * to be applied to the raft log.
+ */
+ public TransactionContext(
+ StateMachine stateMachine, RaftClientRequest clientRequest,
+ SMLogEntryProto smLogEntryProto, Object stateMachineContext) {
+ this(stateMachine);
+ this.clientRequest = Optional.of(clientRequest);
+ this.smLogEntryProto = Optional.ofNullable(smLogEntryProto);
+ this.stateMachineContext = Optional.ofNullable(stateMachineContext);
+ }
+
+ /** The same as this(stateMachine, clientRequest, exception, null). */
+ public TransactionContext(
+ StateMachine stateMachine, RaftClientRequest clientRequest,
+ Exception exception) {
+ this(stateMachine, clientRequest, exception, null);
+ }
+
+ /**
+ * Construct a {@link TransactionContext} from a client request to signal
+ * an exception so that the RAFT server will fail the request on behalf
+ * of the {@link StateMachine}.
+ */
+ public TransactionContext(
+ StateMachine stateMachine, RaftClientRequest clientRequest,
+ Exception exception, Object stateMachineContext) {
+ this(stateMachine);
+ this.clientRequest = Optional.of(clientRequest);
+ this.exception = Optional.of(exception);
+ this.stateMachineContext = Optional.ofNullable(stateMachineContext);
+ }
+
+ /**
+ * Construct a {@link TransactionContext} from a {@link LogEntryProto}.
+ * Used by followers for applying committed entries to the state machine.
+ * @param logEntry the log entry to be applied
+ */
+ public TransactionContext(StateMachine stateMachine, LogEntryProto logEntry) {
+ this(stateMachine);
+ this.smLogEntryProto = Optional.of(logEntry.getSmLogEntry());
+ this.logEntry = Optional.of(logEntry);
+ }
+
+ public Optional<RaftClientRequest> getClientRequest() {
+ return this.clientRequest;
+ }
+
+ public Optional<SMLogEntryProto> getSMLogEntry() {
+ return this.smLogEntryProto;
+ }
+
+ public Optional<Exception> getException() {
+ return this.exception;
+ }
+
+ public TransactionContext setStateMachineContext(Object stateMachineContext) {
+ this.stateMachineContext = Optional.ofNullable(stateMachineContext);
+ return this;
+ }
+
+ public Optional<Object> getStateMachineContext() {
+ return stateMachineContext;
+ }
+
+ public TransactionContext setLogEntry(LogEntryProto logEntry) {
+ this.logEntry = Optional.of(logEntry);
+ return this;
+ }
+
+ public TransactionContext setSmLogEntryProto(SMLogEntryProto smLogEntryProto) {
+ this.smLogEntryProto = Optional.of(smLogEntryProto);
+ return this;
+ }
+
+ public Optional<LogEntryProto> getLogEntry() {
+ return logEntry;
+ }
+
+ private TransactionContext setException(IOException ioe) {
+ assert !this.exception.isPresent();
+ this.exception = Optional.of(ioe);
+ return this;
+ }
+
+ public TransactionContext setShouldCommit(boolean shouldCommit) {
+ this.shouldCommit = shouldCommit;
+ return this;
+ }
+
+ public boolean shouldCommit() {
+ // TODO: Hook this up in the server to bypass the RAFT Log and send back a response to client
+ return this.shouldCommit;
+ }
+
+ // proxy StateMachine methods. We do not want to expose the SM to the RaftLog
+
+ /**
+ * This is called before the transaction passed from the StateMachine is appended to the raft log.
+ * This method will be called from log append and having the same strict serial order that the
+ * Transactions will have in the RAFT log. Since this is called serially in the critical path of
+ * log append, it is important to do only required operations here.
+ * @return The Transaction context.
+ */
+ public TransactionContext preAppendTransaction() throws IOException {
+ return stateMachine.preAppendTransaction(this);
+ }
+
+ /**
+ * Called to notify the state machine that the Transaction passed cannot be appended (or synced).
+ * The exception field will indicate whether there was an exception or not.
+ * @return cancelled transaction
+ */
+ public TransactionContext cancelTransaction() throws IOException {
+ // TODO: This is not called from Raft server / log yet. When an IOException happens, we should
+ // call this to let the SM know that Transaction cannot be synced
+ return stateMachine.cancelTransaction(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
new file mode 100644
index 0000000..60cbb9c
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -0,0 +1,445 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.client.impl.RaftClientImpl;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
+import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.storage.MemoryRaftLog;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.statemachine.BaseStateMachine;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.ExitUtils;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public abstract class MiniRaftCluster {
+ public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class);
+ public static final DelayLocalExecutionInjection logSyncDelay =
+ new DelayLocalExecutionInjection(RaftLog.LOG_SYNC);
+
+ public static final String CLASS_NAME = MiniRaftCluster.class.getSimpleName();
+ public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class";
+ public static final Class<? extends StateMachine> STATEMACHINE_CLASS_DEFAULT = BaseStateMachine.class;
+
+ public static abstract class Factory<CLUSTER extends MiniRaftCluster> {
+ public abstract CLUSTER newCluster(
+ String[] ids, RaftProperties prop, boolean formatted)
+ throws IOException;
+
+ public CLUSTER newCluster(
+ int numServer, RaftProperties prop, boolean formatted)
+ throws IOException {
+ return newCluster(generateIds(numServer, 0), prop, formatted);
+ }
+ }
+
+ public static abstract class RpcBase extends MiniRaftCluster {
+ public RpcBase(String[] ids, RaftProperties properties, boolean formatted) {
+ super(ids, properties, formatted);
+ }
+
+ protected abstract RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException;
+
+ @Override
+ protected void setPeerRpc() throws IOException {
+ for (RaftPeer p : conf.getPeers()) {
+ setPeerRpc(p);
+ }
+ }
+
+ @Override
+ public void restartServer(String id, boolean format) throws IOException {
+ super.restartServer(id, format);
+ setPeerRpc(conf.getPeer(id)).start();
+ }
+
+ @Override
+ public void setBlockRequestsFrom(String src, boolean block) {
+ RaftTestUtil.setBlockRequestsFrom(src, block);
+ }
+ }
+
+ public static class PeerChanges {
+ public final RaftPeer[] allPeersInNewConf;
+ public final RaftPeer[] newPeers;
+ public final RaftPeer[] removedPeers;
+
+ public PeerChanges(RaftPeer[] all, RaftPeer[] newPeers, RaftPeer[] removed) {
+ this.allPeersInNewConf = all;
+ this.newPeers = newPeers;
+ this.removedPeers = removed;
+ }
+ }
+
+ public static RaftConfiguration initConfiguration(String[] ids) {
+ return RaftConfiguration.newBuilder()
+ .setConf(Arrays.stream(ids).map(RaftPeer::new).collect(Collectors.toList()))
+ .build();
+ }
+
+ private static String getBaseDirectory() {
+ return System.getProperty("test.build.data", "target/test/data") + "/raft/";
+ }
+
+ private static void formatDir(String dirStr) {
+ final File serverDir = new File(dirStr);
+ Preconditions.checkState(FileUtils.fullyDelete(serverDir),
+ "Failed to format directory %s", dirStr);
+ LOG.info("Formatted directory {}", dirStr);
+ }
+
+ public static String[] generateIds(int numServers, int base) {
+ String[] ids = new String[numServers];
+ for (int i = 0; i < numServers; i++) {
+ ids[i] = "s" + (i + base);
+ }
+ return ids;
+ }
+
+ protected RaftConfiguration conf;
+ protected final RaftProperties properties;
+ private final String testBaseDir;
+ protected final Map<String, RaftServerImpl> servers =
+ Collections.synchronizedMap(new LinkedHashMap<>());
+
+ public MiniRaftCluster(String[] ids, RaftProperties properties,
+ boolean formatted) {
+ this.conf = initConfiguration(ids);
+ this.properties = new RaftProperties(properties);
+ this.testBaseDir = getBaseDirectory();
+
+ conf.getPeers().forEach(
+ p -> servers.put(p.getId(), newRaftServer(p.getId(), conf, formatted)));
+
+ ExitUtils.disableSystemExit();
+ }
+
+ protected <RPC extends RaftServerRpc> void init(Map<RaftPeer, RPC> peers) {
+ LOG.info("peers = " + peers.keySet());
+ conf = RaftConfiguration.newBuilder().setConf(peers.keySet()).build();
+ for (Map.Entry<RaftPeer, RPC> entry : peers.entrySet()) {
+ final RaftServerImpl server = servers.get(entry.getKey().getId());
+ server.setInitialConf(conf);
+ server.setServerRpc(entry.getValue());
+ }
+ }
+
+ public void start() {
+ LOG.info("Starting " + getClass().getSimpleName());
+ servers.values().forEach(RaftServerImpl::start);
+ }
+
+ /**
+ * start a stopped server again.
+ */
+ public void restartServer(String id, boolean format) throws IOException {
+ killServer(id);
+ servers.remove(id);
+ servers.put(id, newRaftServer(id, conf, format));
+ }
+
+ public final void restart(boolean format) throws IOException {
+ servers.values().stream().filter(RaftServerImpl::isAlive)
+ .forEach(RaftServerImpl::close);
+ List<String> idList = new ArrayList<>(servers.keySet());
+ for (String id : idList) {
+ servers.remove(id);
+ servers.put(id, newRaftServer(id, conf, format));
+ }
+
+ setPeerRpc();
+ start();
+ }
+
+ protected abstract void setPeerRpc() throws IOException;
+
+ public int getMaxTimeout() {
+ return properties.getInt(
+ RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY,
+ RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT);
+ }
+
+ public RaftConfiguration getConf() {
+ return conf;
+ }
+
+ private RaftServerImpl newRaftServer(String id, RaftConfiguration conf,
+ boolean format) {
+ final RaftServerImpl s;
+ try {
+ final String dirStr = testBaseDir + id;
+ if (format) {
+ formatDir(dirStr);
+ }
+ properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, dirStr);
+ s = new RaftServerImpl(id, conf, properties, getStateMachine4Test(properties));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return s;
+ }
+
+ static StateMachine getStateMachine4Test(RaftProperties properties) {
+ final Class<? extends StateMachine> smClass = properties.getClass(
+ STATEMACHINE_CLASS_KEY,
+ STATEMACHINE_CLASS_DEFAULT,
+ StateMachine.class);
+ return RaftUtils.newInstance(smClass);
+ }
+
+ public abstract RaftClientRequestSender getRaftClientRequestSender();
+
+ protected <RPC extends RaftServerRpc> Collection<RaftPeer> addNewPeers(
+ Map<RaftPeer, RPC> newPeers, Collection<RaftServerImpl> newServers,
+ boolean startService) throws IOException {
+ for (Map.Entry<RaftPeer, RPC> entry : newPeers.entrySet()) {
+ RaftServerImpl server = servers.get(entry.getKey().getId());
+ server.setServerRpc(entry.getValue());
+ }
+ if (startService) {
+ newServers.forEach(RaftServerImpl::start);
+ }
+ return new ArrayList<>(newPeers.keySet());
+ }
+
+ protected abstract Collection<RaftPeer> addNewPeers(
+ Collection<RaftPeer> newPeers, Collection<RaftServerImpl> newServers,
+ boolean startService) throws IOException;
+
+ public PeerChanges addNewPeers(int number, boolean startNewPeer)
+ throws IOException {
+ return addNewPeers(generateIds(number, servers.size()), startNewPeer);
+ }
+
+ public PeerChanges addNewPeers(String[] ids,
+ boolean startNewPeer) throws IOException {
+ LOG.info("Add new peers {}", Arrays.asList(ids));
+ Collection<RaftPeer> newPeers = new ArrayList<>(ids.length);
+ for (String id : ids) {
+ newPeers.add(new RaftPeer(id));
+ }
+
+ // create and add new RaftServers
+ final List<RaftServerImpl> newServers = new ArrayList<>(ids.length);
+ for (RaftPeer p : newPeers) {
+ RaftServerImpl newServer = newRaftServer(p.getId(), conf, true);
+ Preconditions.checkArgument(!servers.containsKey(p.getId()));
+ servers.put(p.getId(), newServer);
+ newServers.add(newServer);
+ }
+
+ // for hadoop-rpc-enabled peer, we assign inetsocketaddress here
+ newPeers = addNewPeers(newPeers, newServers, startNewPeer);
+
+ final RaftPeer[] np = newPeers.toArray(new RaftPeer[newPeers.size()]);
+ newPeers.addAll(conf.getPeers());
+ conf = RaftConfiguration.newBuilder().setConf(newPeers).setLogEntryIndex(0).build();
+ RaftPeer[] p = newPeers.toArray(new RaftPeer[newPeers.size()]);
+ return new PeerChanges(p, np, new RaftPeer[0]);
+ }
+
+ public void startServer(String id) {
+ RaftServerImpl server = servers.get(id);
+ assert server != null;
+ server.start();
+ }
+
+ private RaftPeer getPeer(RaftServerImpl s) {
+ return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress());
+ }
+
+ /**
+ * prepare the peer list when removing some peers from the conf
+ */
+ public PeerChanges removePeers(int number, boolean removeLeader,
+ Collection<RaftPeer> excluded) {
+ Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers());
+ List<RaftPeer> removedPeers = new ArrayList<>(number);
+ if (removeLeader) {
+ final RaftPeer leader = getPeer(getLeader());
+ assert !excluded.contains(leader);
+ peers.remove(leader);
+ removedPeers.add(leader);
+ }
+ List<RaftServerImpl> followers = getFollowers();
+ for (int i = 0, removed = 0; i < followers.size() &&
+ removed < (removeLeader ? number - 1 : number); i++) {
+ RaftPeer toRemove = getPeer(followers.get(i));
+ if (!excluded.contains(toRemove)) {
+ peers.remove(toRemove);
+ removedPeers.add(toRemove);
+ removed++;
+ }
+ }
+ conf = RaftConfiguration.newBuilder().setConf(peers).setLogEntryIndex(0).build();
+ RaftPeer[] p = peers.toArray(new RaftPeer[peers.size()]);
+ return new PeerChanges(p, new RaftPeer[0],
+ removedPeers.toArray(new RaftPeer[removedPeers.size()]));
+ }
+
+ public void killServer(String id) {
+ servers.get(id).close();
+ }
+
+ public String printServers() {
+ StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n");
+ for (RaftServerImpl s : servers.values()) {
+ b.append(" ");
+ b.append(s).append("\n");
+ }
+ return b.toString();
+ }
+
+ public String printAllLogs() {
+ StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n");
+ for (RaftServerImpl s : servers.values()) {
+ b.append(" ");
+ b.append(s).append("\n");
+
+ final RaftLog log = s.getState().getLog();
+ if (log instanceof MemoryRaftLog) {
+ b.append(" ");
+ b.append(((MemoryRaftLog) log).getEntryString());
+ }
+ }
+ return b.toString();
+ }
+
+ public RaftServerImpl getLeader() {
+ final List<RaftServerImpl> leaders = new ArrayList<>();
+ servers.values().stream()
+ .filter(s -> s.isAlive() && s.isLeader())
+ .forEach(s -> {
+ if (leaders.isEmpty()) {
+ leaders.add(s);
+ } else {
+ final long leaderTerm = leaders.get(0).getState().getCurrentTerm();
+ final long term = s.getState().getCurrentTerm();
+ if (term >= leaderTerm) {
+ if (term > leaderTerm) {
+ leaders.clear();
+ }
+ leaders.add(s);
+ }
+ }
+ });
+ if (leaders.isEmpty()) {
+ return null;
+ } else if (leaders.size() != 1) {
+ Assert.fail(printServers() + leaders.toString()
+ + "leaders.size() = " + leaders.size() + " != 1");
+ }
+ return leaders.get(0);
+ }
+
+ public boolean isLeader(String leaderId) throws InterruptedException {
+ final RaftServerImpl leader = getLeader();
+ return leader != null && leader.getId().equals(leaderId);
+ }
+
+ public List<RaftServerImpl> getFollowers() {
+ return servers.values().stream()
+ .filter(s -> s.isAlive() && s.isFollower())
+ .collect(Collectors.toList());
+ }
+
+ public Collection<RaftServerImpl> getServers() {
+ return servers.values();
+ }
+
+ public RaftServerImpl getServer(String id) {
+ return servers.get(id);
+ }
+
+ public Collection<RaftPeer> getPeers() {
+ return getServers().stream().map(s ->
+ new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress()))
+ .collect(Collectors.toList());
+ }
+
+ public RaftClient createClient(String clientId, String leaderId) {
+ return new RaftClientImpl(clientId, conf.getPeers(),
+ getRaftClientRequestSender(), leaderId, properties);
+ }
+
+ public void shutdown() {
+ LOG.info("Stopping " + getClass().getSimpleName());
+ servers.values().stream().filter(RaftServerImpl::isAlive)
+ .forEach(RaftServerImpl::close);
+
+ if (ExitUtils.isTerminated()) {
+ LOG.error("Test resulted in an unexpected exit",
+ ExitUtils.getFirstExitException());
+ throw new AssertionError("Test resulted in an unexpected exit");
+ }
+ }
+
+ /**
+ * Block all the incoming requests for the peer with leaderId. Also delay
+ * outgoing or incoming msg for all other peers.
+ */
+ protected abstract void blockQueueAndSetDelay(String leaderId, int delayMs)
+ throws InterruptedException;
+
+ /**
+ * Try to enforce the leader of the cluster.
+ * @param leaderId ID of the targeted leader server.
+ * @return true if server has been successfully enforced to the leader, false
+ * otherwise.
+ */
+ public boolean tryEnforceLeader(String leaderId) throws InterruptedException {
+ // do nothing and see if the given id is already a leader.
+ if (isLeader(leaderId)) {
+ return true;
+ }
+
+ // Blocking all other server's RPC read process to make sure a read takes at
+ // least ELECTION_TIMEOUT_MIN. In this way when the target leader request a
+ // vote, all non-leader servers can grant the vote.
+ // Disable the target leader server RPC so that it can request a vote.
+ blockQueueAndSetDelay(leaderId, RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT);
+
+ // Reopen queues so that the vote can make progress.
+ blockQueueAndSetDelay(leaderId, 0);
+
+ return isLeader(leaderId);
+ }
+
+ /** Block/unblock the requests sent from the given source. */
+ public abstract void setBlockRequestsFrom(String src, boolean block);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
new file mode 100644
index 0000000..4ec78b9
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.junit.*;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.ratis.RaftTestUtil.waitAndKillLeader;
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class RaftBasicTests {
+ public static final Logger LOG = LoggerFactory.getLogger(RaftBasicTests.class);
+
+ public static final int NUM_SERVERS = 5;
+
+ protected static final RaftProperties properties = new RaftProperties();
+
+ public abstract MiniRaftCluster getCluster();
+
+ public RaftProperties getProperties() {
+ return properties;
+ }
+
+ @Rule
+ public Timeout globalTimeout = new Timeout(120 * 1000);
+
+ @Before
+ public void setup() throws IOException {
+ Assert.assertNull(getCluster().getLeader());
+ getCluster().start();
+ }
+
+ @After
+ public void tearDown() {
+ final MiniRaftCluster cluster = getCluster();
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testBasicLeaderElection() throws Exception {
+ LOG.info("Running testBasicLeaderElection");
+ final MiniRaftCluster cluster = getCluster();
+ waitAndKillLeader(cluster, true);
+ waitAndKillLeader(cluster, true);
+ waitAndKillLeader(cluster, true);
+ waitAndKillLeader(cluster, false);
+ }
+
+ @Test
+ public void testBasicAppendEntries() throws Exception {
+ LOG.info("Running testBasicAppendEntries");
+ final MiniRaftCluster cluster = getCluster();
+ RaftServerImpl leader = waitForLeader(cluster);
+ final long term = leader.getState().getCurrentTerm();
+ final String killed = cluster.getFollowers().get(3).getId();
+ cluster.killServer(killed);
+ LOG.info(cluster.printServers());
+
+ final SimpleMessage[] messages = SimpleMessage.create(10);
+ try(final RaftClient client = cluster.createClient("client", null)) {
+ for (SimpleMessage message : messages) {
+ client.send(message);
+ }
+ }
+
+ Thread.sleep(cluster.getMaxTimeout() + 100);
+ LOG.info(cluster.printAllLogs());
+
+ cluster.getServers().stream().filter(RaftServerImpl::isAlive)
+ .map(s -> s.getState().getLog().getEntries(1, Long.MAX_VALUE))
+ .forEach(e -> RaftTestUtil.assertLogEntries(e, 1, term, messages));
+ }
+
+ @Test
+ public void testEnforceLeader() throws Exception {
+ LOG.info("Running testEnforceLeader");
+ final String leader = "s" + ThreadLocalRandom.current().nextInt(NUM_SERVERS);
+ LOG.info("enforce leader to " + leader);
+ final MiniRaftCluster cluster = getCluster();
+ waitForLeader(cluster);
+ waitForLeader(cluster, leader);
+ }
+
+ static class Client4TestWithLoad extends Thread {
+ final RaftClient client;
+ final SimpleMessage[] messages;
+
+ final AtomicInteger step = new AtomicInteger();
+ volatile Exception exceptionInClientThread;
+
+ Client4TestWithLoad(RaftClient client, int numMessages) {
+ this.client = client;
+ this.messages = SimpleMessage.create(numMessages, client.getId());
+ }
+
+ boolean isRunning() {
+ return step.get() < messages.length && exceptionInClientThread == null;
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (; isRunning(); ) {
+ client.send(messages[step.getAndIncrement()]);
+ }
+ client.close();
+ } catch (IOException ioe) {
+ exceptionInClientThread = ioe;
+ }
+ }
+ }
+
+ @Test
+ public void testWithLoad() throws Exception {
+ testWithLoad(10, 500);
+ }
+
+ private void testWithLoad(final int numClients, final int numMessages)
+ throws Exception {
+ LOG.info("Running testWithLoad: numClients=" + numClients
+ + ", numMessages=" + numMessages);
+
+ final MiniRaftCluster cluster = getCluster();
+ LOG.info(cluster.printServers());
+
+ final List<Client4TestWithLoad> clients
+ = Stream.iterate(0, i -> i+1).limit(numClients)
+ .map(i -> cluster.createClient(String.valueOf((char)('a' + i)), null))
+ .map(c -> new Client4TestWithLoad(c, numMessages))
+ .collect(Collectors.toList());
+ clients.forEach(Thread::start);
+
+ int count = 0;
+ for(int lastStep = 0;; ) {
+ if (clients.stream().filter(Client4TestWithLoad::isRunning).count() == 0) {
+ break;
+ }
+
+ final int n = clients.stream().mapToInt(c -> c.step.get()).sum();
+ if (n - lastStep < 50 * numClients) { // Change leader at least 50 steps.
+ Thread.sleep(10);
+ continue;
+ }
+ lastStep = n;
+ count++;
+
+ RaftServerImpl leader = cluster.getLeader();
+ if (leader != null) {
+ final String oldLeader = leader.getId();
+ LOG.info("Block all requests sent by leader " + oldLeader);
+ String newLeader = RaftTestUtil.changeLeader(cluster, oldLeader);
+ LOG.info("Changed leader from " + oldLeader + " to " + newLeader);
+ Assert.assertFalse(newLeader.equals(oldLeader));
+ }
+ }
+
+ for(Client4TestWithLoad c : clients) {
+ c.join();
+ }
+ for(Client4TestWithLoad c : clients) {
+ if (c.exceptionInClientThread != null) {
+ throw new AssertionError(c.exceptionInClientThread);
+ }
+ RaftTestUtil.assertLogEntries(cluster.getServers(), c.messages);
+ }
+
+ LOG.info("Leader change count=" + count + cluster.printAllLogs());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
new file mode 100644
index 0000000..6d25835
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.client.impl.RaftClientImpl;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.simulation.RequestHandler;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.*;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+public abstract class RaftNotLeaderExceptionBaseTest {
+ static {
+ RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+ RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
+ RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG);
+ RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+ }
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(RaftNotLeaderExceptionBaseTest.class);
+ public static final int NUM_PEERS = 3;
+
+ @Rule
+ public Timeout globalTimeout = new Timeout(60 * 1000);
+
+ private MiniRaftCluster cluster;
+
+ public abstract MiniRaftCluster initCluster() throws IOException;
+
+ @Before
+ public void setup() throws IOException {
+ this.cluster = initCluster();
+ cluster.start();
+ }
+
+ @After
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testHandleNotLeaderException() throws Exception {
+ RaftTestUtil.waitForLeader(cluster);
+ final String leaderId = cluster.getLeader().getId();
+ final RaftClient client = cluster.createClient("client", leaderId);
+
+ RaftClientReply reply = client.send(new SimpleMessage("m1"));
+ Assert.assertTrue(reply.isSuccess());
+
+ // enforce leader change
+ String newLeader = RaftTestUtil.changeLeader(cluster, leaderId);
+ Assert.assertNotEquals(leaderId, newLeader);
+
+ RaftClientRequestSender rpc = ((RaftClientImpl)client).getRequestSender();
+ reply= null;
+ for (int i = 0; reply == null && i < 10; i++) {
+ try {
+ reply = rpc.sendRequest(
+ new RaftClientRequest("client", leaderId, DEFAULT_SEQNUM,
+ new SimpleMessage("m2")));
+ } catch (IOException ignored) {
+ Thread.sleep(1000);
+ }
+ }
+ Assert.assertNotNull(reply);
+ Assert.assertFalse(reply.isSuccess());
+ Assert.assertTrue(reply.isNotLeader());
+ Assert.assertEquals(newLeader,
+ reply.getNotLeaderException().getSuggestedLeader().getId());
+
+ reply = client.send(new SimpleMessage("m3"));
+ Assert.assertTrue(reply.isSuccess());
+ client.close();
+ }
+
+ @Test
+ public void testNotLeaderExceptionWithReconf() throws Exception {
+ Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster));
+
+ final String leaderId = cluster.getLeader().getId();
+ final RaftClient client = cluster.createClient("client", leaderId);
+
+ // enforce leader change
+ String newLeader = RaftTestUtil.changeLeader(cluster, leaderId);
+ Assert.assertNotEquals(leaderId, newLeader);
+
+ // also add two new peers
+ // add two more peers
+ MiniRaftCluster.PeerChanges change = cluster.addNewPeers(
+ new String[]{"ss1", "ss2"}, true);
+ // trigger setConfiguration
+ LOG.info("Start changing the configuration: {}",
+ Arrays.asList(change.allPeersInNewConf));
+ try(final RaftClient c2 = cluster.createClient("client2", newLeader)) {
+ RaftClientReply reply = c2.setConfiguration(change.allPeersInNewConf);
+ Assert.assertTrue(reply.isSuccess());
+ }
+ LOG.info(cluster.printServers());
+
+ RaftClientRequestSender rpc = ((RaftClientImpl)client).getRequestSender();
+ RaftClientReply reply = null;
+ // it is possible that the remote peer's rpc server is not ready. need retry
+ for (int i = 0; reply == null && i < 10; i++) {
+ try {
+ reply = rpc.sendRequest(
+ new RaftClientRequest("client", leaderId, DEFAULT_SEQNUM,
+ new SimpleMessage("m1")));
+ } catch (IOException ignored) {
+ Thread.sleep(1000);
+ }
+ }
+ Assert.assertNotNull(reply);
+ Assert.assertFalse(reply.isSuccess());
+ Assert.assertTrue(reply.isNotLeader());
+ Assert.assertEquals(newLeader,
+ reply.getNotLeaderException().getSuggestedLeader().getId());
+ Collection<RaftPeer> peers = cluster.getPeers();
+ RaftPeer[] peersFromReply = reply.getNotLeaderException().getPeers();
+ Assert.assertEquals(peers.size(), peersFromReply.length);
+ for (RaftPeer p : peersFromReply) {
+ Assert.assertTrue(peers.contains(p));
+ }
+
+ reply = client.send(new SimpleMessage("m2"));
+ Assert.assertTrue(reply.isSuccess());
+ client.close();
+ }
+}