You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/01/31 21:17:20 UTC
[35/54] [abbrv] incubator-ratis git commit: Renamed the packages from
raft to ratis in preperation for Apache Incubation - Moved all java packages
from org.apache.raft to org.apache.ratis. - Moved native package to
org_apache_ratis, and native lib to l
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java b/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java
deleted file mode 100644
index 293e1a4..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.commons.io.Charsets;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.server.impl.ConfigurationManager;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.server.storage.RaftStorageDirectory.LogPathAndIndex;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.AutoCloseableLock;
-import org.apache.raft.util.CodeInjectionForTesting;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT;
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY;
-
-/**
- * The RaftLog implementation that writes log entries into segmented files in
- * local disk.
- *
- * The max log segment size is 8MB. The real log segment size may not be
- * exactly equal to this limit. If a log entry's size exceeds 8MB, this entry
- * will be stored in a single segment.
- *
- * There are two types of segments: closed segment and open segment. The former
- * is named as "log_startindex-endindex", the later is named as
- * "log_inprogress_startindex".
- *
- * There can be multiple closed segments but there is at most one open segment.
- * When the open segment reaches the size limit, or the log term increases, we
- * close the open segment and start a new open segment. A closed segment cannot
- * be appended anymore, but it can be truncated in case that a follower's log is
- * inconsistent with the current leader.
- *
- * Every closed segment should be non-empty, i.e., it should contain at least
- * one entry.
- *
- * There should not be any gap between segments. The first segment may not start
- * from index 0 since there may be snapshots as log compaction. The last index
- * in segments should be no smaller than the last index of snapshot, otherwise
- * we may have hole when append further log.
- */
-public class SegmentedRaftLog extends RaftLog {
- static final String HEADER_STR = "RAFTLOG1";
- static final byte[] HEADER_BYTES = HEADER_STR.getBytes(Charsets.UTF_8);
-
- /**
- * I/O task definitions.
- */
- static abstract class Task {
- private boolean done = false;
-
- synchronized void done() {
- done = true;
- notifyAll();
- }
-
- synchronized void waitForDone() throws InterruptedException {
- while (!done) {
- wait();
- }
- }
-
- abstract void execute() throws IOException;
-
- abstract long getEndIndex();
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "-" + getEndIndex();
- }
- }
- private static final ThreadLocal<Task> myTask = new ThreadLocal<>();
-
- private final RaftStorage storage;
- private final RaftLogCache cache;
- private final RaftLogWorker fileLogWorker;
- private final long segmentMaxSize;
-
- public SegmentedRaftLog(String selfId, RaftServerImpl server, RaftStorage storage,
- long lastIndexInSnapshot, RaftProperties properties) throws IOException {
- super(selfId);
- this.storage = storage;
- this.segmentMaxSize = properties.getLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY,
- RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT);
- cache = new RaftLogCache();
- fileLogWorker = new RaftLogWorker(server, storage, properties);
- lastCommitted.set(lastIndexInSnapshot);
- }
-
- @Override
- public void open(ConfigurationManager confManager, long lastIndexInSnapshot)
- throws IOException {
- loadLogSegments(confManager, lastIndexInSnapshot);
- File openSegmentFile = null;
- if (cache.getOpenSegment() != null) {
- openSegmentFile = storage.getStorageDir()
- .getOpenLogFile(cache.getOpenSegment().getStartIndex());
- }
- fileLogWorker.start(Math.max(cache.getEndIndex(), lastIndexInSnapshot),
- openSegmentFile);
- super.open(confManager, lastIndexInSnapshot);
- }
-
- @Override
- public long getStartIndex() {
- return cache.getStartIndex();
- }
-
- private void loadLogSegments(ConfigurationManager confManager,
- long lastIndexInSnapshot) throws IOException {
- try(AutoCloseableLock writeLock = writeLock()) {
- List<LogPathAndIndex> paths = storage.getStorageDir().getLogSegmentFiles();
- for (LogPathAndIndex pi : paths) {
- LogSegment logSegment = parseLogSegment(pi, confManager);
- cache.addSegment(logSegment);
- }
-
- // if the largest index is smaller than the last index in snapshot, we do
- // not load the log to avoid holes between log segments. This may happen
- // when the local I/O worker is too slow to persist log (slower than
- // committing the log and taking snapshot)
- if (!cache.isEmpty() && cache.getEndIndex() < lastIndexInSnapshot) {
- LOG.warn("End log index {} is smaller than last index in snapshot {}",
- cache.getEndIndex(), lastIndexInSnapshot);
- cache.clear();
- // TODO purge all segment files
- }
- }
- }
-
- private LogSegment parseLogSegment(LogPathAndIndex pi,
- ConfigurationManager confManager) throws IOException {
- final boolean isOpen = pi.endIndex == RaftServerConstants.INVALID_LOG_INDEX;
- return LogSegment.loadSegment(pi.path.toFile(), pi.startIndex, pi.endIndex,
- isOpen, confManager);
- }
-
- @Override
- public LogEntryProto get(long index) {
- checkLogState();
- try(AutoCloseableLock readLock = readLock()) {
- return cache.getEntry(index);
- }
- }
-
- @Override
- public LogEntryProto[] getEntries(long startIndex, long endIndex) {
- checkLogState();
- try(AutoCloseableLock readLock = readLock()) {
- return cache.getEntries(startIndex, endIndex);
- }
- }
-
- @Override
- public LogEntryProto getLastEntry() {
- checkLogState();
- try(AutoCloseableLock readLock = readLock()) {
- return cache.getLastEntry();
- }
- }
-
- /**
- * The method, along with {@link #appendEntry} and
- * {@link #append(LogEntryProto...)} need protection of RaftServer's lock.
- */
- @Override
- void truncate(long index) {
- checkLogState();
- try(AutoCloseableLock writeLock = writeLock()) {
- RaftLogCache.TruncationSegments ts = cache.truncate(index);
- if (ts != null) {
- Task task = fileLogWorker.truncate(ts);
- myTask.set(task);
- }
- }
- }
-
- @Override
- void appendEntry(LogEntryProto entry) {
- checkLogState();
- try(AutoCloseableLock writeLock = writeLock()) {
- final LogSegment currentOpenSegment = cache.getOpenSegment();
- if (currentOpenSegment == null) {
- cache.addSegment(LogSegment.newOpenSegment(entry.getIndex()));
- fileLogWorker.startLogSegment(getNextIndex());
- } else if (isSegmentFull(currentOpenSegment, entry)) {
- cache.rollOpenSegment(true);
- fileLogWorker.rollLogSegment(currentOpenSegment);
- } else if (currentOpenSegment.numOfEntries() > 0 &&
- currentOpenSegment.getLastRecord().entry.getTerm() != entry.getTerm()) {
- // the term changes
- final long currentTerm = currentOpenSegment.getLastRecord().entry
- .getTerm();
- Preconditions.checkState(currentTerm < entry.getTerm(),
- "open segment's term %s is larger than the new entry's term %s",
- currentTerm, entry.getTerm());
- cache.rollOpenSegment(true);
- fileLogWorker.rollLogSegment(currentOpenSegment);
- }
-
- cache.appendEntry(entry);
- myTask.set(fileLogWorker.writeLogEntry(entry));
- }
- }
-
- private boolean isSegmentFull(LogSegment segment, LogEntryProto entry) {
- if (segment.getTotalSize() >= segmentMaxSize) {
- return true;
- } else {
- final long entrySize = LogSegment.getEntrySize(entry);
- // if entry size is greater than the max segment size, write it directly
- // into the current segment
- return entrySize <= segmentMaxSize &&
- segment.getTotalSize() + entrySize > segmentMaxSize;
- }
- }
-
- @Override
- public void append(LogEntryProto... entries) {
- checkLogState();
- if (entries == null || entries.length == 0) {
- return;
- }
- try(AutoCloseableLock writeLock = writeLock()) {
- Iterator<LogEntryProto> iter = cache.iterator(entries[0].getIndex());
- int index = 0;
- long truncateIndex = -1;
- for (; iter.hasNext() && index < entries.length; index++) {
- LogEntryProto storedEntry = iter.next();
- Preconditions.checkState(
- storedEntry.getIndex() == entries[index].getIndex(),
- "The stored entry's index %s is not consistent with" +
- " the received entries[%s]'s index %s", storedEntry.getIndex(),
- index, entries[index].getIndex());
-
- if (storedEntry.getTerm() != entries[index].getTerm()) {
- // we should truncate from the storedEntry's index
- truncateIndex = storedEntry.getIndex();
- break;
- }
- }
- if (truncateIndex != -1) {
- // truncate from truncateIndex
- truncate(truncateIndex);
- }
- // append from entries[index]
- for (int i = index; i < entries.length; i++) {
- appendEntry(entries[i]);
- }
- }
- }
-
- @Override
- public void logSync() throws InterruptedException {
- CodeInjectionForTesting.execute(LOG_SYNC, getSelfId(), null);
- final Task task = myTask.get();
- if (task != null) {
- task.waitForDone();
- }
- }
-
- @Override
- public long getLatestFlushedIndex() {
- return fileLogWorker.getFlushedIndex();
- }
-
- /**
- * {@inheritDoc}
- *
- * This operation is protected by the RaftServer's lock
- */
- @Override
- public void writeMetadata(long term, String votedFor) throws IOException {
- storage.getMetaFile().set(term, votedFor);
- }
-
- @Override
- public Metadata loadMetadata() throws IOException {
- return new Metadata(storage.getMetaFile().getVotedFor(),
- storage.getMetaFile().getTerm());
- }
-
- @Override
- public void syncWithSnapshot(long lastSnapshotIndex) {
- fileLogWorker.syncWithSnapshot(lastSnapshotIndex);
- // TODO purge log files and normal/tmp/corrupt snapshot files
- // if the last index in snapshot is larger than the index of the last
- // log entry, we should delete all the log entries and their cache to avoid
- // gaps between log segments.
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- fileLogWorker.close();
- storage.close();
- }
-
- @VisibleForTesting
- RaftLogCache getRaftLogCache() {
- return cache;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/SnapshotManager.java b/raft-server/src/main/java/org/apache/raft/server/storage/SnapshotManager.java
deleted file mode 100644
index 8ab2833..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/SnapshotManager.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.io.MD5Hash;
-import org.apache.raft.shaded.proto.RaftProtos.FileChunkProto;
-import org.apache.raft.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.util.FileUtils;
-import org.apache.raft.util.MD5FileUtil;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-
-/**
- * Manage snapshots of a raft peer.
- * TODO: snapshot should be treated as compaction log thus can be merged into
- * RaftLog. In this way we can have a unified getLastTermIndex interface.
- */
-public class SnapshotManager {
- private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class);
-
- private final RaftStorage storage;
- private final String selfId;
-
- public SnapshotManager(RaftStorage storage, String selfId)
- throws IOException {
- this.storage = storage;
- this.selfId = selfId;
- }
-
- public void installSnapshot(StateMachine stateMachine,
- InstallSnapshotRequestProto request) throws IOException {
- final long lastIncludedIndex = request.getTermIndex().getIndex();
- final RaftStorageDirectory dir = storage.getStorageDir();
-
- File tmpDir = dir.getNewTempDir();
- tmpDir.mkdirs();
- tmpDir.deleteOnExit();
-
- LOG.info("Installing snapshot:{}, to tmp dir:{}", request, tmpDir);
-
- // TODO: Make sure that subsequent requests for the same installSnapshot are coming in order,
- // and are not lost when whole request cycle is done. Check requestId and requestIndex here
-
- for (FileChunkProto chunk : request.getFileChunksList()) {
- SnapshotInfo pi = stateMachine.getLatestSnapshot();
- if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) {
- throw new IOException("There exists snapshot file "
- + pi.getFiles() + " in " + selfId
- + " with endIndex >= lastIncludedIndex " + lastIncludedIndex);
- }
-
- String fileName = chunk.getFilename(); // this is relative to the root dir
- // TODO: assumes flat layout inside SM dir
- File tmpSnapshotFile = new File(tmpDir,
- new File(dir.getRoot(), fileName).getName());
-
- FileOutputStream out = null;
- try {
- // if offset is 0, delete any existing temp snapshot file if it has the
- // same last index.
- if (chunk.getOffset() == 0) {
- if (tmpSnapshotFile.exists()) {
- FileUtils.fullyDelete(tmpSnapshotFile);
- }
- // create the temp snapshot file and put padding inside
- out = new FileOutputStream(tmpSnapshotFile);
- } else {
- Preconditions.checkState(tmpSnapshotFile.exists());
- out = new FileOutputStream(tmpSnapshotFile, true);
- FileChannel fc = out.getChannel();
- fc.position(chunk.getOffset());
- }
-
- // write data to the file
- out.write(chunk.getData().toByteArray());
- } finally {
- RaftUtils.cleanup(null, out);
- }
-
- // rename the temp snapshot file if this is the last chunk. also verify
- // the md5 digest and create the md5 meta-file.
- if (chunk.getDone()) {
- final MD5Hash expectedDigest =
- new MD5Hash(chunk.getFileDigest().toByteArray());
- // calculate the checksum of the snapshot file and compare it with the
- // file digest in the request
- MD5Hash digest = MD5FileUtil.computeMd5ForFile(tmpSnapshotFile);
- if (!digest.equals(expectedDigest)) {
- LOG.warn("The snapshot md5 digest {} does not match expected {}",
- digest, expectedDigest);
- // rename the temp snapshot file to .corrupt
-// NativeIO.renameTo(tmpSnapshotFile, // TODO:
-// dir.getCorruptSnapshotFile(lastIncludedTerm, lastIncludedIndex));
- throw new IOException("MD5 mismatch for snapshot-" + lastIncludedIndex
- + " installation");
- } else {
- MD5FileUtil.saveMD5File(tmpSnapshotFile, digest);
- }
- }
- }
-
- if (request.getDone()) {
- LOG.info("Install snapshot is done, renaming tnp dir:{} to:{}",
- tmpDir, dir.getStateMachineDir());
- dir.getStateMachineDir().delete();
- tmpDir.renameTo(dir.getStateMachineDir());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java b/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java
deleted file mode 100644
index ccc52c7..0000000
--- a/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.raft.statemachine;
-
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.Message;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.server.impl.RaftConfiguration;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.server.storage.RaftStorage;
-import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto;
-import org.apache.raft.util.LifeCycle;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Base implementation for StateMachines.
- */
-public class BaseStateMachine implements StateMachine {
-
- protected RaftProperties properties;
- protected RaftStorage storage;
- protected RaftConfiguration raftConf;
- protected final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName());
-
- @Override
- public LifeCycle.State getLifeCycleState() {
- return lifeCycle.getCurrentState();
- }
-
- @Override
- public void initialize(String id, RaftProperties properties, RaftStorage storage)
- throws IOException {
- lifeCycle.setName(getClass().getSimpleName() + ":" + id);
- this.properties = properties;
- this.storage = storage;
- }
-
- @Override
- public void setRaftConfiguration(RaftConfiguration conf) {
- this.raftConf = conf;
- }
-
- @Override
- public RaftConfiguration getRaftConfiguration() {
- return this.raftConf;
- }
-
- @Override
- public SnapshotInfo getLatestSnapshot() {
- return getStateMachineStorage().getLatestSnapshot();
- }
-
- @Override
- public void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException {
- // do nothing
- }
-
- public void pause() {
- }
-
- @Override
- public void reinitialize(String id, RaftProperties properties, RaftStorage storage)
- throws IOException {
- }
-
- @Override
- public TransactionContext applyTransactionSerial(TransactionContext trx) throws IOException {
- return trx;
- }
-
- @Override
- public CompletableFuture<Message> applyTransaction(TransactionContext trx) throws IOException {
- // return the same message contained in the entry
- Message msg = () -> trx.getLogEntry().get().getSmLogEntry().getData();
- return CompletableFuture.completedFuture(msg);
- }
-
- @Override
- public long takeSnapshot() throws IOException {
- return RaftServerConstants.INVALID_LOG_INDEX;
- }
-
- @Override
- public StateMachineStorage getStateMachineStorage() {
- return new StateMachineStorage() {
- @Override
- public void init(RaftStorage raftStorage) throws IOException {
- }
-
- @Override
- public SnapshotInfo getLatestSnapshot() {
- return null;
- }
-
- @Override
- public void format() throws IOException {
- }
- };
- }
-
- @Override
- public CompletableFuture<RaftClientReply> query(
- RaftClientRequest request) {
- return null;
- }
-
- @Override
- public TransactionContext startTransaction(RaftClientRequest request)
- throws IOException {
- return new TransactionContext(this, request,
- SMLogEntryProto.newBuilder()
- .setData(request.getMessage().getContent())
- .build());
- }
-
- @Override
- public TransactionContext cancelTransaction(TransactionContext trx) throws IOException {
- return trx;
- }
-
- @Override
- public TransactionContext preAppendTransaction(TransactionContext trx) throws IOException {
- return trx;
- }
-
- @Override
- public void close() throws IOException {
- // do nothing
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/FileListSnapshotInfo.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/FileListSnapshotInfo.java b/raft-server/src/main/java/org/apache/raft/statemachine/FileListSnapshotInfo.java
deleted file mode 100644
index b65fc13..0000000
--- a/raft-server/src/main/java/org/apache/raft/statemachine/FileListSnapshotInfo.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.FileInfo;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Each snapshot has a list of files.
- *
- * The objects of this class are immutable.
- */
-public class FileListSnapshotInfo implements SnapshotInfo {
- private final TermIndex termIndex;
- private final List<FileInfo> files;
-
- public FileListSnapshotInfo(List<FileInfo> files, long term, long index) {
- this.termIndex = TermIndex.newTermIndex(term, index);
- this.files = Collections.unmodifiableList(files);
- }
-
- @Override
- public TermIndex getTermIndex() {
- return termIndex;
- }
-
- @Override
- public long getTerm() {
- return termIndex.getTerm();
- }
-
- @Override
- public long getIndex() {
- return termIndex.getIndex();
- }
-
- @Override
- public List<FileInfo> getFiles() {
- return files;
- }
-
- @Override
- public String toString() {
- return termIndex + ":" + files;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java b/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java
deleted file mode 100644
index a779f98..0000000
--- a/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.raft.io.MD5Hash;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.FileInfo;
-import org.apache.raft.server.storage.RaftStorage;
-import org.apache.raft.util.AtomicFileOutputStream;
-import org.apache.raft.util.MD5FileUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * A StateMachineStorage that stores the snapshot in a single file.
- */
-public class SimpleStateMachineStorage implements StateMachineStorage {
-
- private static final Logger LOG = LoggerFactory.getLogger(SimpleStateMachineStorage.class);
-
- static final String SNAPSHOT_FILE_PREFIX = "snapshot";
- static final String CORRUPT_SNAPSHOT_FILE_SUFFIX = ".corrupt";
- /** snapshot.term_index */
- static final Pattern SNAPSHOT_REGEX =
- Pattern.compile(SNAPSHOT_FILE_PREFIX + "\\.(\\d+)_(\\d+)");
-
- private RaftStorage raftStorage;
- private File smDir = null;
-
- private volatile SingleFileSnapshotInfo currentSnapshot = null;
-
- public void init(RaftStorage raftStorage) throws IOException {
- this.raftStorage = raftStorage;
- this.smDir = raftStorage.getStorageDir().getStateMachineDir();
- loadLatestSnapshot();
- }
-
- @Override
- public void format() throws IOException {
- // TODO
- }
-
- @VisibleForTesting
- public static TermIndex getTermIndexFromSnapshotFile(File file) {
- final String name = file.getName();
- final Matcher m = SNAPSHOT_REGEX.matcher(name);
- if (!m.matches()) {
- throw new IllegalArgumentException("File \"" + file
- + "\" does not match snapshot file name pattern \""
- + SNAPSHOT_REGEX + "\"");
- }
- final long term = Long.parseLong(m.group(1));
- final long index = Long.parseLong(m.group(2));
- return TermIndex.newTermIndex(term, index);
- }
-
- protected static String getTmpSnapshotFileName(long term, long endIndex) {
- return getSnapshotFileName(term, endIndex) + AtomicFileOutputStream.TMP_EXTENSION;
- }
-
- protected static String getCorruptSnapshotFileName(long term, long endIndex) {
- return getSnapshotFileName(term, endIndex) + CORRUPT_SNAPSHOT_FILE_SUFFIX;
- }
-
- public File getSnapshotFile(long term, long endIndex) {
- return new File(smDir, getSnapshotFileName(term, endIndex));
- }
-
- protected File getTmpSnapshotFile(long term, long endIndex) {
- return new File(smDir, getTmpSnapshotFileName(term, endIndex));
- }
-
- protected File getCorruptSnapshotFile(long term, long endIndex) {
- return new File(smDir, getCorruptSnapshotFileName(term, endIndex));
- }
-
- public SingleFileSnapshotInfo findLatestSnapshot() throws IOException {
- SingleFileSnapshotInfo latest = null;
- try (DirectoryStream<Path> stream =
- Files.newDirectoryStream(smDir.toPath())) {
- for (Path path : stream) {
- Matcher matcher = SNAPSHOT_REGEX.matcher(path.getFileName().toString());
- if (matcher.matches()) {
- final long endIndex = Long.parseLong(matcher.group(2));
- if (latest == null || endIndex > latest.getIndex()) {
- final long term = Long.parseLong(matcher.group(1));
- MD5Hash fileDigest = MD5FileUtil.readStoredMd5ForFile(path.toFile());
- final FileInfo fileInfo = new FileInfo(path, fileDigest);
- latest = new SingleFileSnapshotInfo(fileInfo, term, endIndex);
- }
- }
- }
- }
- return latest;
- }
-
- public void loadLatestSnapshot() throws IOException {
- this.currentSnapshot = findLatestSnapshot();
- }
-
- public static String getSnapshotFileName(long term, long endIndex) {
- return SNAPSHOT_FILE_PREFIX + "." + term + "_" + endIndex;
- }
-
- @Override
- public SingleFileSnapshotInfo getLatestSnapshot() {
- return currentSnapshot;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/SingleFileSnapshotInfo.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/SingleFileSnapshotInfo.java b/raft-server/src/main/java/org/apache/raft/statemachine/SingleFileSnapshotInfo.java
deleted file mode 100644
index 6b01e17..0000000
--- a/raft-server/src/main/java/org/apache/raft/statemachine/SingleFileSnapshotInfo.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import org.apache.raft.server.storage.FileInfo;
-
-import java.util.Arrays;
-
-/**
- * Each snapshot only has a single file.
- *
- * The objects of this class are immutable.
- */
-public class SingleFileSnapshotInfo extends FileListSnapshotInfo {
- public SingleFileSnapshotInfo(FileInfo fileInfo, long term, long endIndex) {
- super(Arrays.asList(fileInfo), term, endIndex);
- }
-
- /** @return the file associated with the snapshot. */
- public FileInfo getFile() {
- return getFiles().get(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/SnapshotInfo.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/SnapshotInfo.java b/raft-server/src/main/java/org/apache/raft/statemachine/SnapshotInfo.java
deleted file mode 100644
index 0fdcbc3..0000000
--- a/raft-server/src/main/java/org/apache/raft/statemachine/SnapshotInfo.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.FileInfo;
-
-import java.util.List;
-
-/**
- * SnapshotInfo represents a durable state by the state machine. The state machine implementation is
- * responsible for the layout of the snapshot files as well as making the data durable. Latest term,
- * latest index, and the raft configuration must be saved together with any data files in the
- * snapshot.
- */
-public interface SnapshotInfo {
-
- /**
- * Returns the term and index corresponding to this snapshot.
- * @return The term and index corresponding to this snapshot.
- */
- TermIndex getTermIndex();
-
- /**
- * Returns the term corresponding to this snapshot.
- * @return The term corresponding to this snapshot.
- */
- long getTerm();
-
- /**
- * Returns the index corresponding to this snapshot.
- * @return The index corresponding to this snapshot.
- */
- long getIndex();
-
- /**
- * Returns a list of files corresponding to this snapshot. This list should include all
- * the files that the state machine keeps in its data directory. This list of files will be
- * copied as to other replicas in install snapshot RPCs.
- * @return a list of Files corresponding to the this snapshot.
- */
- List<FileInfo> getFiles();
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/StateMachine.java b/raft-server/src/main/java/org/apache/raft/statemachine/StateMachine.java
deleted file mode 100644
index 3dedf88..0000000
--- a/raft-server/src/main/java/org/apache/raft/statemachine/StateMachine.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.Message;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.server.RaftServerConfigKeys;
-import org.apache.raft.server.impl.RaftConfiguration;
-import org.apache.raft.server.storage.RaftStorage;
-import org.apache.raft.util.LifeCycle;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * StateMachine is the entry point for the custom implementation of replicated state as defined in
- * the "State Machine Approach" in the literature
- * (see https://en.wikipedia.org/wiki/State_machine_replication).
- */
-public interface StateMachine extends Closeable {
- /**
- * Initializes the State Machine with the given properties and storage. The state machine is
- * responsible reading the latest snapshot from the file system (if any) and initialize itself
- * with the latest term and index there including all the edits.
- */
- void initialize(String id, RaftProperties properties, RaftStorage storage)
- throws IOException;
-
- /**
- * Returns the lifecycle state for this StateMachine.
- * @return the lifecycle state.
- */
- LifeCycle.State getLifeCycleState();
-
- /**
- * Pauses the state machine. On return, the state machine should have closed all open files so
- * that a new snapshot can be installed.
- */
- void pause();
-
- /**
- * Re-initializes the State Machine in PAUSED state with the given properties and storage. The
- * state machine is responsible reading the latest snapshot from the file system (if any) and
- * initialize itself with the latest term and index there including all the edits.
- */
- void reinitialize(String id, RaftProperties properties, RaftStorage storage)
- throws IOException;
-
- /**
- * Dump the in-memory state into a snapshot file in the RaftStorage. The
- * StateMachine implementation can decide 1) its own snapshot format, 2) when
- * a snapshot is taken, and 3) how the snapshot is taken (e.g., whether the
- * snapshot blocks the state machine, and whether to purge log entries after
- * a snapshot is done).
- *
- * In the meanwhile, when the size of raft log outside of the latest snapshot
- * exceeds certain threshold, the RaftServer may choose to trigger a snapshot
- * if {@link RaftServerConfigKeys#RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY} is
- * enabled.
- *
- * The snapshot should include the latest raft configuration.
- *
- * @return the largest index of the log entry that has been applied to the
- * state machine and also included in the snapshot. Note the log purge
- * should be handled separately.
- */
- // TODO: refactor this
- long takeSnapshot() throws IOException;
-
- /**
- * Record the RaftConfiguration in the state machine. The RaftConfiguration
- * should also be stored in the snapshot.
- */
- void setRaftConfiguration(RaftConfiguration conf);
-
- /**
- * @return the latest raft configuration recorded in the state machine.
- */
- RaftConfiguration getRaftConfiguration();
-
- /**
- * @return StateMachineStorage to interact with the durability guarantees provided by the
- * state machine.
- */
- StateMachineStorage getStateMachineStorage();
-
- /**
- * Returns the information for the latest durable snapshot.
- */
- SnapshotInfo getLatestSnapshot();
-
- /**
- * Query the state machine. The request must be read-only.
- * TODO: extend RaftClientRequest to have a read-only request subclass.
- */
- CompletableFuture<RaftClientReply> query(RaftClientRequest request);
-
- /**
- * Validate/pre-process the incoming update request in the state machine.
- * @return the content to be written to the log entry. Null means the request
- * should be rejected.
- * @throws IOException thrown by the state machine while validation
- */
- TransactionContext startTransaction(RaftClientRequest request)
- throws IOException;
-
- /**
- * This is called before the transaction passed from the StateMachine is appended to the raft log.
- * This method will be called from log append and having the same strict serial order that the
- * transactions will have in the RAFT log. Since this is called serially in the critical path of
- * log append, it is important to do only required operations here.
- * @return The Transaction context.
- */
- TransactionContext preAppendTransaction(TransactionContext trx) throws IOException;
-
- /**
- * Called to notify the state machine that the Transaction passed cannot be appended (or synced).
- * The exception field will indicate whether there was an exception or not.
- * @param trx the transaction to cancel
- * @return cancelled transaction
- */
- TransactionContext cancelTransaction(TransactionContext trx) throws IOException;
-
- /**
- * Called for transactions that have been committed to the RAFT log. This step is called
- * sequentially in strict serial order that the transactions have been committed in the log.
- * The SM is expected to do only necessary work, and leave the actual apply operation to the
- * applyTransaction calls that can happen concurrently.
- * @param trx the transaction state including the log entry that has been committed to a quorum
- * of the raft peers
- * @return The Transaction context.
- */
- TransactionContext applyTransactionSerial(TransactionContext trx) throws IOException;
-
- /**
- * Apply a committed log entry to the state machine. This method can be called concurrently with
- * the other calls, and there is no guarantee that the calls will be ordered according to the
- * log commit order.
- * @param trx the transaction state including the log entry that has been committed to a quorum
- * of the raft peers
- */
- // TODO: We do not need to return CompletableFuture
- CompletableFuture<Message> applyTransaction(TransactionContext trx) throws IOException;
-
- /**
- * Notify the state machine that the raft peer is no longer leader.
- */
- void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/StateMachineStorage.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/StateMachineStorage.java b/raft-server/src/main/java/org/apache/raft/statemachine/StateMachineStorage.java
deleted file mode 100644
index 30005f9..0000000
--- a/raft-server/src/main/java/org/apache/raft/statemachine/StateMachineStorage.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import org.apache.raft.server.storage.RaftStorage;
-import java.io.IOException;
-
-public interface StateMachineStorage {
-
- void init(RaftStorage raftStorage) throws IOException;
-
- /**
- * Returns the information for the latest durable snapshot.
- */
- SnapshotInfo getLatestSnapshot();
-
- // TODO: StateMachine can decide to compact the files independently of concurrent install snapshot
- // etc requests. We should have ref counting for the SnapshotInfo with a release mechanism
- // so that raft server will release the files after the snapshot file copy in case a compaction
- // is waiting for deleting these files.
-
- void format() throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/TransactionContext.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/TransactionContext.java b/raft-server/src/main/java/org/apache/raft/statemachine/TransactionContext.java
deleted file mode 100644
index 675ada9..0000000
--- a/raft-server/src/main/java/org/apache/raft/statemachine/TransactionContext.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Optional;
-
-/**
- * Context for a transaction.
- * The transaction might have originated from a client request, or it
- * maybe coming from another replica of the state machine through the RAFT log.
- * {@link TransactionContext} can be created from
- * either the {@link StateMachine} or the state machine updater.
- *
- * In the first case, the {@link StateMachine} is a leader. When it receives
- * a {@link StateMachine#startTransaction(RaftClientRequest)} request, it returns
- * a {@link TransactionContext} with the changes from the {@link StateMachine}.
- * The same context will be passed back to the {@link StateMachine}
- * via the {@link StateMachine#applyTransaction(TransactionContext)} call
- * or the {@link StateMachine#notifyNotLeader(Collection)} call.
- *
- * In the second case, the {@link StateMachine} is a follower.
- * The {@link TransactionContext} will be a committed entry coming from
- * the RAFT log from the leader.
- */
-public class TransactionContext {
-
- /** The {@link StateMachine} that originated the transaction. */
- private final StateMachine stateMachine;
-
- /** Original request from the client */
- private Optional<RaftClientRequest> clientRequest = Optional.empty();
-
- /** Exception from the {@link StateMachine} or from the log */
- private Optional<Exception> exception = Optional.empty();
-
- /** Data from the {@link StateMachine} */
- private Optional<SMLogEntryProto> smLogEntryProto = Optional.empty();
-
- /**
- * Context specific to the state machine.
- * The {@link StateMachine} can use this object to carry state between
- * {@link StateMachine#startTransaction(RaftClientRequest)} and
- * {@link StateMachine#applyTransaction(TransactionContext)}.
- */
- private Optional<Object> stateMachineContext = Optional.empty();
-
- /**
- * Whether to commit the transaction to the RAFT Log.
- * In some cases the {@link StateMachine} may want to indicate
- * that the transaction should not be committed
- */
- private boolean shouldCommit = true;
-
- /** Committed LogEntry. */
- private Optional<LogEntryProto> logEntry = Optional.empty();
-
- private TransactionContext(StateMachine stateMachine) {
- this.stateMachine = stateMachine;
- }
-
- /** The same as this(stateMachine, clientRequest, smLogEntryProto, null). */
- public TransactionContext(
- StateMachine stateMachine, RaftClientRequest clientRequest,
- SMLogEntryProto smLogEntryProto) {
- this(stateMachine, clientRequest, smLogEntryProto, null);
- }
-
- /**
- * Construct a {@link TransactionContext} from a client request.
- * Used by the state machine to start a transaction
- * and send the Log entry representing the transaction data
- * to be applied to the raft log.
- */
- public TransactionContext(
- StateMachine stateMachine, RaftClientRequest clientRequest,
- SMLogEntryProto smLogEntryProto, Object stateMachineContext) {
- this(stateMachine);
- this.clientRequest = Optional.of(clientRequest);
- this.smLogEntryProto = Optional.ofNullable(smLogEntryProto);
- this.stateMachineContext = Optional.ofNullable(stateMachineContext);
- }
-
- /** The same as this(stateMachine, clientRequest, exception, null). */
- public TransactionContext(
- StateMachine stateMachine, RaftClientRequest clientRequest,
- Exception exception) {
- this(stateMachine, clientRequest, exception, null);
- }
-
- /**
- * Construct a {@link TransactionContext} from a client request to signal
- * an exception so that the RAFT server will fail the request on behalf
- * of the {@link StateMachine}.
- */
- public TransactionContext(
- StateMachine stateMachine, RaftClientRequest clientRequest,
- Exception exception, Object stateMachineContext) {
- this(stateMachine);
- this.clientRequest = Optional.of(clientRequest);
- this.exception = Optional.of(exception);
- this.stateMachineContext = Optional.ofNullable(stateMachineContext);
- }
-
- /**
- * Construct a {@link TransactionContext} from a {@link LogEntryProto}.
- * Used by followers for applying committed entries to the state machine.
- * @param logEntry the log entry to be applied
- */
- public TransactionContext(StateMachine stateMachine, LogEntryProto logEntry) {
- this(stateMachine);
- this.smLogEntryProto = Optional.of(logEntry.getSmLogEntry());
- this.logEntry = Optional.of(logEntry);
- }
-
- public Optional<RaftClientRequest> getClientRequest() {
- return this.clientRequest;
- }
-
- public Optional<SMLogEntryProto> getSMLogEntry() {
- return this.smLogEntryProto;
- }
-
- public Optional<Exception> getException() {
- return this.exception;
- }
-
- public TransactionContext setStateMachineContext(Object stateMachineContext) {
- this.stateMachineContext = Optional.ofNullable(stateMachineContext);
- return this;
- }
-
- public Optional<Object> getStateMachineContext() {
- return stateMachineContext;
- }
-
- public TransactionContext setLogEntry(LogEntryProto logEntry) {
- this.logEntry = Optional.of(logEntry);
- return this;
- }
-
- public TransactionContext setSmLogEntryProto(SMLogEntryProto smLogEntryProto) {
- this.smLogEntryProto = Optional.of(smLogEntryProto);
- return this;
- }
-
- public Optional<LogEntryProto> getLogEntry() {
- return logEntry;
- }
-
- private TransactionContext setException(IOException ioe) {
- assert !this.exception.isPresent();
- this.exception = Optional.of(ioe);
- return this;
- }
-
- public TransactionContext setShouldCommit(boolean shouldCommit) {
- this.shouldCommit = shouldCommit;
- return this;
- }
-
- public boolean shouldCommit() {
- // TODO: Hook this up in the server to bypass the RAFT Log and send back a response to client
- return this.shouldCommit;
- }
-
- // proxy StateMachine methods. We do not want to expose the SM to the RaftLog
-
- /**
- * This is called before the transaction passed from the StateMachine is appended to the raft log.
- * This method will be called from log append and having the same strict serial order that the
- * Transactions will have in the RAFT log. Since this is called serially in the critical path of
- * log append, it is important to do only required operations here.
- * @return The Transaction context.
- */
- public TransactionContext preAppendTransaction() throws IOException {
- return stateMachine.preAppendTransaction(this);
- }
-
- /**
- * Called to notify the state machine that the Transaction passed cannot be appended (or synced).
- * The exception field will indicate whether there was an exception or not.
- * @return cancelled transaction
- */
- public TransactionContext cancelTransaction() throws IOException {
- // TODO: This is not called from Raft server / log yet. When an IOException happens, we should
- // call this to let the SM know that Transaction cannot be synced
- return stateMachine.cancelTransaction(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java b/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java
deleted file mode 100644
index c66ef8f..0000000
--- a/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java
+++ /dev/null
@@ -1,444 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.client.RaftClient;
-import org.apache.raft.client.RaftClientRequestSender;
-import org.apache.raft.client.impl.RaftClientImpl;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.server.RaftServerConfigKeys;
-import org.apache.raft.server.impl.DelayLocalExecutionInjection;
-import org.apache.raft.server.impl.RaftConfiguration;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.apache.raft.server.RaftServerRpc;
-import org.apache.raft.server.storage.MemoryRaftLog;
-import org.apache.raft.server.storage.RaftLog;
-import org.apache.raft.statemachine.BaseStateMachine;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.util.ExitUtils;
-import org.apache.raft.util.FileUtils;
-import org.apache.raft.util.RaftUtils;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.stream.Collectors;
-
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT;
-
-public abstract class MiniRaftCluster {
- public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class);
- public static final DelayLocalExecutionInjection logSyncDelay =
- new DelayLocalExecutionInjection(RaftLog.LOG_SYNC);
-
- public static final String CLASS_NAME = MiniRaftCluster.class.getSimpleName();
- public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class";
- public static final Class<? extends StateMachine> STATEMACHINE_CLASS_DEFAULT = BaseStateMachine.class;
-
- public static abstract class Factory<CLUSTER extends MiniRaftCluster> {
- public abstract CLUSTER newCluster(
- String[] ids, RaftProperties prop, boolean formatted)
- throws IOException;
-
- public CLUSTER newCluster(
- int numServer, RaftProperties prop, boolean formatted)
- throws IOException {
- return newCluster(generateIds(numServer, 0), prop, formatted);
- }
- }
-
- public static abstract class RpcBase extends MiniRaftCluster {
- public RpcBase(String[] ids, RaftProperties properties, boolean formatted) {
- super(ids, properties, formatted);
- }
-
- protected abstract RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException;
-
- @Override
- protected void setPeerRpc() throws IOException {
- for (RaftPeer p : conf.getPeers()) {
- setPeerRpc(p);
- }
- }
-
- @Override
- public void restartServer(String id, boolean format) throws IOException {
- super.restartServer(id, format);
- setPeerRpc(conf.getPeer(id)).start();
- }
-
- @Override
- public void setBlockRequestsFrom(String src, boolean block) {
- RaftTestUtil.setBlockRequestsFrom(src, block);
- }
- }
-
- public static class PeerChanges {
- public final RaftPeer[] allPeersInNewConf;
- public final RaftPeer[] newPeers;
- public final RaftPeer[] removedPeers;
-
- public PeerChanges(RaftPeer[] all, RaftPeer[] newPeers, RaftPeer[] removed) {
- this.allPeersInNewConf = all;
- this.newPeers = newPeers;
- this.removedPeers = removed;
- }
- }
-
- public static RaftConfiguration initConfiguration(String[] ids) {
- return RaftConfiguration.newBuilder()
- .setConf(Arrays.stream(ids).map(RaftPeer::new).collect(Collectors.toList()))
- .build();
- }
-
- private static String getBaseDirectory() {
- return System.getProperty("test.build.data", "target/test/data") + "/raft/";
- }
-
- private static void formatDir(String dirStr) {
- final File serverDir = new File(dirStr);
- Preconditions.checkState(FileUtils.fullyDelete(serverDir),
- "Failed to format directory %s", dirStr);
- LOG.info("Formatted directory {}", dirStr);
- }
-
- public static String[] generateIds(int numServers, int base) {
- String[] ids = new String[numServers];
- for (int i = 0; i < numServers; i++) {
- ids[i] = "s" + (i + base);
- }
- return ids;
- }
-
- protected RaftConfiguration conf;
- protected final RaftProperties properties;
- private final String testBaseDir;
- protected final Map<String, RaftServerImpl> servers =
- Collections.synchronizedMap(new LinkedHashMap<>());
-
- public MiniRaftCluster(String[] ids, RaftProperties properties,
- boolean formatted) {
- this.conf = initConfiguration(ids);
- this.properties = new RaftProperties(properties);
- this.testBaseDir = getBaseDirectory();
-
- conf.getPeers().forEach(
- p -> servers.put(p.getId(), newRaftServer(p.getId(), conf, formatted)));
-
- ExitUtils.disableSystemExit();
- }
-
- protected <RPC extends RaftServerRpc> void init(Map<RaftPeer, RPC> peers) {
- LOG.info("peers = " + peers.keySet());
- conf = RaftConfiguration.newBuilder().setConf(peers.keySet()).build();
- for (Map.Entry<RaftPeer, RPC> entry : peers.entrySet()) {
- final RaftServerImpl server = servers.get(entry.getKey().getId());
- server.setInitialConf(conf);
- server.setServerRpc(entry.getValue());
- }
- }
-
- public void start() {
- LOG.info("Starting " + getClass().getSimpleName());
- servers.values().forEach(RaftServerImpl::start);
- }
-
- /**
- * start a stopped server again.
- */
- public void restartServer(String id, boolean format) throws IOException {
- killServer(id);
- servers.remove(id);
- servers.put(id, newRaftServer(id, conf, format));
- }
-
- public final void restart(boolean format) throws IOException {
- servers.values().stream().filter(RaftServerImpl::isAlive)
- .forEach(RaftServerImpl::close);
- List<String> idList = new ArrayList<>(servers.keySet());
- for (String id : idList) {
- servers.remove(id);
- servers.put(id, newRaftServer(id, conf, format));
- }
-
- setPeerRpc();
- start();
- }
-
- protected abstract void setPeerRpc() throws IOException;
-
- public int getMaxTimeout() {
- return properties.getInt(
- RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY,
- RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT);
- }
-
- public RaftConfiguration getConf() {
- return conf;
- }
-
- private RaftServerImpl newRaftServer(String id, RaftConfiguration conf,
- boolean format) {
- final RaftServerImpl s;
- try {
- final String dirStr = testBaseDir + id;
- if (format) {
- formatDir(dirStr);
- }
- properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, dirStr);
- s = new RaftServerImpl(id, conf, properties, getStateMachine4Test(properties));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return s;
- }
-
- static StateMachine getStateMachine4Test(RaftProperties properties) {
- final Class<? extends StateMachine> smClass = properties.getClass(
- STATEMACHINE_CLASS_KEY,
- STATEMACHINE_CLASS_DEFAULT,
- StateMachine.class);
- return RaftUtils.newInstance(smClass);
- }
-
- public abstract RaftClientRequestSender getRaftClientRequestSender();
-
- protected <RPC extends RaftServerRpc> Collection<RaftPeer> addNewPeers(
- Map<RaftPeer, RPC> newPeers, Collection<RaftServerImpl> newServers,
- boolean startService) throws IOException {
- for (Map.Entry<RaftPeer, RPC> entry : newPeers.entrySet()) {
- RaftServerImpl server = servers.get(entry.getKey().getId());
- server.setServerRpc(entry.getValue());
- }
- if (startService) {
- newServers.forEach(RaftServerImpl::start);
- }
- return new ArrayList<>(newPeers.keySet());
- }
-
- protected abstract Collection<RaftPeer> addNewPeers(
- Collection<RaftPeer> newPeers, Collection<RaftServerImpl> newServers,
- boolean startService) throws IOException;
-
- public PeerChanges addNewPeers(int number, boolean startNewPeer)
- throws IOException {
- return addNewPeers(generateIds(number, servers.size()), startNewPeer);
- }
-
- public PeerChanges addNewPeers(String[] ids,
- boolean startNewPeer) throws IOException {
- LOG.info("Add new peers {}", Arrays.asList(ids));
- Collection<RaftPeer> newPeers = new ArrayList<>(ids.length);
- for (String id : ids) {
- newPeers.add(new RaftPeer(id));
- }
-
- // create and add new RaftServers
- final List<RaftServerImpl> newServers = new ArrayList<>(ids.length);
- for (RaftPeer p : newPeers) {
- RaftServerImpl newServer = newRaftServer(p.getId(), conf, true);
- Preconditions.checkArgument(!servers.containsKey(p.getId()));
- servers.put(p.getId(), newServer);
- newServers.add(newServer);
- }
-
- // for hadoop-rpc-enabled peer, we assign inetsocketaddress here
- newPeers = addNewPeers(newPeers, newServers, startNewPeer);
-
- final RaftPeer[] np = newPeers.toArray(new RaftPeer[newPeers.size()]);
- newPeers.addAll(conf.getPeers());
- conf = RaftConfiguration.newBuilder().setConf(newPeers).setLogEntryIndex(0).build();
- RaftPeer[] p = newPeers.toArray(new RaftPeer[newPeers.size()]);
- return new PeerChanges(p, np, new RaftPeer[0]);
- }
-
- public void startServer(String id) {
- RaftServerImpl server = servers.get(id);
- assert server != null;
- server.start();
- }
-
- private RaftPeer getPeer(RaftServerImpl s) {
- return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress());
- }
-
- /**
- * prepare the peer list when removing some peers from the conf
- */
- public PeerChanges removePeers(int number, boolean removeLeader,
- Collection<RaftPeer> excluded) {
- Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers());
- List<RaftPeer> removedPeers = new ArrayList<>(number);
- if (removeLeader) {
- final RaftPeer leader = getPeer(getLeader());
- assert !excluded.contains(leader);
- peers.remove(leader);
- removedPeers.add(leader);
- }
- List<RaftServerImpl> followers = getFollowers();
- for (int i = 0, removed = 0; i < followers.size() &&
- removed < (removeLeader ? number - 1 : number); i++) {
- RaftPeer toRemove = getPeer(followers.get(i));
- if (!excluded.contains(toRemove)) {
- peers.remove(toRemove);
- removedPeers.add(toRemove);
- removed++;
- }
- }
- conf = RaftConfiguration.newBuilder().setConf(peers).setLogEntryIndex(0).build();
- RaftPeer[] p = peers.toArray(new RaftPeer[peers.size()]);
- return new PeerChanges(p, new RaftPeer[0],
- removedPeers.toArray(new RaftPeer[removedPeers.size()]));
- }
-
- public void killServer(String id) {
- servers.get(id).close();
- }
-
- public String printServers() {
- StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n");
- for (RaftServerImpl s : servers.values()) {
- b.append(" ");
- b.append(s).append("\n");
- }
- return b.toString();
- }
-
- public String printAllLogs() {
- StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n");
- for (RaftServerImpl s : servers.values()) {
- b.append(" ");
- b.append(s).append("\n");
-
- final RaftLog log = s.getState().getLog();
- if (log instanceof MemoryRaftLog) {
- b.append(" ");
- b.append(((MemoryRaftLog) log).getEntryString());
- }
- }
- return b.toString();
- }
-
- public RaftServerImpl getLeader() {
- final List<RaftServerImpl> leaders = new ArrayList<>();
- servers.values().stream()
- .filter(s -> s.isAlive() && s.isLeader())
- .forEach(s -> {
- if (leaders.isEmpty()) {
- leaders.add(s);
- } else {
- final long leaderTerm = leaders.get(0).getState().getCurrentTerm();
- final long term = s.getState().getCurrentTerm();
- if (term >= leaderTerm) {
- if (term > leaderTerm) {
- leaders.clear();
- }
- leaders.add(s);
- }
- }
- });
- if (leaders.isEmpty()) {
- return null;
- } else if (leaders.size() != 1) {
- Assert.fail(printServers() + leaders.toString()
- + "leaders.size() = " + leaders.size() + " != 1");
- }
- return leaders.get(0);
- }
-
- public boolean isLeader(String leaderId) throws InterruptedException {
- final RaftServerImpl leader = getLeader();
- return leader != null && leader.getId().equals(leaderId);
- }
-
- public List<RaftServerImpl> getFollowers() {
- return servers.values().stream()
- .filter(s -> s.isAlive() && s.isFollower())
- .collect(Collectors.toList());
- }
-
- public Collection<RaftServerImpl> getServers() {
- return servers.values();
- }
-
- public RaftServerImpl getServer(String id) {
- return servers.get(id);
- }
-
- public Collection<RaftPeer> getPeers() {
- return getServers().stream().map(s ->
- new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress()))
- .collect(Collectors.toList());
- }
-
- public RaftClient createClient(String clientId, String leaderId) {
- return new RaftClientImpl(clientId, conf.getPeers(),
- getRaftClientRequestSender(), leaderId, properties);
- }
-
- public void shutdown() {
- LOG.info("Stopping " + getClass().getSimpleName());
- servers.values().stream().filter(RaftServerImpl::isAlive)
- .forEach(RaftServerImpl::close);
-
- if (ExitUtils.isTerminated()) {
- LOG.error("Test resulted in an unexpected exit",
- ExitUtils.getFirstExitException());
- throw new AssertionError("Test resulted in an unexpected exit");
- }
- }
-
- /**
- * Block all the incoming requests for the peer with leaderId. Also delay
- * outgoing or incoming msg for all other peers.
- */
- protected abstract void blockQueueAndSetDelay(String leaderId, int delayMs)
- throws InterruptedException;
-
- /**
- * Try to enforce the leader of the cluster.
- * @param leaderId ID of the targeted leader server.
- * @return true if server has been successfully enforced to the leader, false
- * otherwise.
- */
- public boolean tryEnforceLeader(String leaderId) throws InterruptedException {
- // do nothing and see if the given id is already a leader.
- if (isLeader(leaderId)) {
- return true;
- }
-
- // Blocking all other server's RPC read process to make sure a read takes at
- // least ELECTION_TIMEOUT_MIN. In this way when the target leader request a
- // vote, all non-leader servers can grant the vote.
- // Disable the target leader server RPC so that it can request a vote.
- blockQueueAndSetDelay(leaderId, RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT);
-
- // Reopen queues so that the vote can make progress.
- blockQueueAndSetDelay(leaderId, 0);
-
- return isLeader(leaderId);
- }
-
- /** Block/unblock the requests sent from the given source. */
- public abstract void setBlockRequestsFrom(String src, boolean block);
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java b/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java
deleted file mode 100644
index ed40bde..0000000
--- a/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft;
-
-import org.apache.raft.RaftTestUtil.SimpleMessage;
-import org.apache.raft.client.RaftClient;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.junit.*;
-import org.junit.rules.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static org.apache.raft.RaftTestUtil.waitAndKillLeader;
-import static org.apache.raft.RaftTestUtil.waitForLeader;
-
-public abstract class RaftBasicTests {
- public static final Logger LOG = LoggerFactory.getLogger(RaftBasicTests.class);
-
- public static final int NUM_SERVERS = 5;
-
- protected static final RaftProperties properties = new RaftProperties();
-
- public abstract MiniRaftCluster getCluster();
-
- public RaftProperties getProperties() {
- return properties;
- }
-
- @Rule
- public Timeout globalTimeout = new Timeout(120 * 1000);
-
- @Before
- public void setup() throws IOException {
- Assert.assertNull(getCluster().getLeader());
- getCluster().start();
- }
-
- @After
- public void tearDown() {
- final MiniRaftCluster cluster = getCluster();
- if (cluster != null) {
- cluster.shutdown();
- }
- }
-
- @Test
- public void testBasicLeaderElection() throws Exception {
- LOG.info("Running testBasicLeaderElection");
- final MiniRaftCluster cluster = getCluster();
- waitAndKillLeader(cluster, true);
- waitAndKillLeader(cluster, true);
- waitAndKillLeader(cluster, true);
- waitAndKillLeader(cluster, false);
- }
-
- @Test
- public void testBasicAppendEntries() throws Exception {
- LOG.info("Running testBasicAppendEntries");
- final MiniRaftCluster cluster = getCluster();
- RaftServerImpl leader = waitForLeader(cluster);
- final long term = leader.getState().getCurrentTerm();
- final String killed = cluster.getFollowers().get(3).getId();
- cluster.killServer(killed);
- LOG.info(cluster.printServers());
-
- final SimpleMessage[] messages = SimpleMessage.create(10);
- try(final RaftClient client = cluster.createClient("client", null)) {
- for (SimpleMessage message : messages) {
- client.send(message);
- }
- }
-
- Thread.sleep(cluster.getMaxTimeout() + 100);
- LOG.info(cluster.printAllLogs());
-
- cluster.getServers().stream().filter(RaftServerImpl::isAlive)
- .map(s -> s.getState().getLog().getEntries(1, Long.MAX_VALUE))
- .forEach(e -> RaftTestUtil.assertLogEntries(e, 1, term, messages));
- }
-
- @Test
- public void testEnforceLeader() throws Exception {
- LOG.info("Running testEnforceLeader");
- final String leader = "s" + ThreadLocalRandom.current().nextInt(NUM_SERVERS);
- LOG.info("enforce leader to " + leader);
- final MiniRaftCluster cluster = getCluster();
- waitForLeader(cluster);
- waitForLeader(cluster, leader);
- }
-
- static class Client4TestWithLoad extends Thread {
- final RaftClient client;
- final SimpleMessage[] messages;
-
- final AtomicInteger step = new AtomicInteger();
- volatile Exception exceptionInClientThread;
-
- Client4TestWithLoad(RaftClient client, int numMessages) {
- this.client = client;
- this.messages = SimpleMessage.create(numMessages, client.getId());
- }
-
- boolean isRunning() {
- return step.get() < messages.length && exceptionInClientThread == null;
- }
-
- @Override
- public void run() {
- try {
- for (; isRunning(); ) {
- client.send(messages[step.getAndIncrement()]);
- }
- client.close();
- } catch (IOException ioe) {
- exceptionInClientThread = ioe;
- }
- }
- }
-
- @Test
- public void testWithLoad() throws Exception {
- testWithLoad(10, 500);
- }
-
- private void testWithLoad(final int numClients, final int numMessages)
- throws Exception {
- LOG.info("Running testWithLoad: numClients=" + numClients
- + ", numMessages=" + numMessages);
-
- final MiniRaftCluster cluster = getCluster();
- LOG.info(cluster.printServers());
-
- final List<Client4TestWithLoad> clients
- = Stream.iterate(0, i -> i+1).limit(numClients)
- .map(i -> cluster.createClient(String.valueOf((char)('a' + i)), null))
- .map(c -> new Client4TestWithLoad(c, numMessages))
- .collect(Collectors.toList());
- clients.forEach(Thread::start);
-
- int count = 0;
- for(int lastStep = 0;; ) {
- if (clients.stream().filter(Client4TestWithLoad::isRunning).count() == 0) {
- break;
- }
-
- final int n = clients.stream().mapToInt(c -> c.step.get()).sum();
- if (n - lastStep < 50 * numClients) { // Change leader at least 50 steps.
- Thread.sleep(10);
- continue;
- }
- lastStep = n;
- count++;
-
- RaftServerImpl leader = cluster.getLeader();
- if (leader != null) {
- final String oldLeader = leader.getId();
- LOG.info("Block all requests sent by leader " + oldLeader);
- String newLeader = RaftTestUtil.changeLeader(cluster, oldLeader);
- LOG.info("Changed leader from " + oldLeader + " to " + newLeader);
- Assert.assertFalse(newLeader.equals(oldLeader));
- }
- }
-
- for(Client4TestWithLoad c : clients) {
- c.join();
- }
- for(Client4TestWithLoad c : clients) {
- if (c.exceptionInClientThread != null) {
- throw new AssertionError(c.exceptionInClientThread);
- }
- RaftTestUtil.assertLogEntries(cluster.getServers(), c.messages);
- }
-
- LOG.info("Leader change count=" + count + cluster.printAllLogs());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java b/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java
deleted file mode 100644
index 195cbec..0000000
--- a/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft;
-
-import org.apache.log4j.Level;
-import org.apache.raft.RaftTestUtil.SimpleMessage;
-import org.apache.raft.client.RaftClient;
-import org.apache.raft.client.RaftClientRequestSender;
-import org.apache.raft.client.impl.RaftClientImpl;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.apache.raft.server.simulation.RequestHandler;
-import org.apache.raft.server.storage.RaftLog;
-import org.apache.raft.util.RaftUtils;
-import org.junit.*;
-import org.junit.rules.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-import static org.apache.raft.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
-
-public abstract class RaftNotLeaderExceptionBaseTest {
- static {
- RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
- RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
- RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG);
- RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
- }
-
- public static final Logger LOG =
- LoggerFactory.getLogger(RaftNotLeaderExceptionBaseTest.class);
- public static final int NUM_PEERS = 3;
-
- @Rule
- public Timeout globalTimeout = new Timeout(60 * 1000);
-
- private MiniRaftCluster cluster;
-
- public abstract MiniRaftCluster initCluster() throws IOException;
-
- @Before
- public void setup() throws IOException {
- this.cluster = initCluster();
- cluster.start();
- }
-
- @After
- public void tearDown() {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
-
- @Test
- public void testHandleNotLeaderException() throws Exception {
- RaftTestUtil.waitForLeader(cluster);
- final String leaderId = cluster.getLeader().getId();
- final RaftClient client = cluster.createClient("client", leaderId);
-
- RaftClientReply reply = client.send(new SimpleMessage("m1"));
- Assert.assertTrue(reply.isSuccess());
-
- // enforce leader change
- String newLeader = RaftTestUtil.changeLeader(cluster, leaderId);
- Assert.assertNotEquals(leaderId, newLeader);
-
- RaftClientRequestSender rpc = ((RaftClientImpl)client).getRequestSender();
- reply= null;
- for (int i = 0; reply == null && i < 10; i++) {
- try {
- reply = rpc.sendRequest(
- new RaftClientRequest("client", leaderId, DEFAULT_SEQNUM,
- new SimpleMessage("m2")));
- } catch (IOException ignored) {
- Thread.sleep(1000);
- }
- }
- Assert.assertNotNull(reply);
- Assert.assertFalse(reply.isSuccess());
- Assert.assertTrue(reply.isNotLeader());
- Assert.assertEquals(newLeader,
- reply.getNotLeaderException().getSuggestedLeader().getId());
-
- reply = client.send(new SimpleMessage("m3"));
- Assert.assertTrue(reply.isSuccess());
- client.close();
- }
-
- @Test
- public void testNotLeaderExceptionWithReconf() throws Exception {
- Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster));
-
- final String leaderId = cluster.getLeader().getId();
- final RaftClient client = cluster.createClient("client", leaderId);
-
- // enforce leader change
- String newLeader = RaftTestUtil.changeLeader(cluster, leaderId);
- Assert.assertNotEquals(leaderId, newLeader);
-
- // also add two new peers
- // add two more peers
- MiniRaftCluster.PeerChanges change = cluster.addNewPeers(
- new String[]{"ss1", "ss2"}, true);
- // trigger setConfiguration
- LOG.info("Start changing the configuration: {}",
- Arrays.asList(change.allPeersInNewConf));
- try(final RaftClient c2 = cluster.createClient("client2", newLeader)) {
- RaftClientReply reply = c2.setConfiguration(change.allPeersInNewConf);
- Assert.assertTrue(reply.isSuccess());
- }
- LOG.info(cluster.printServers());
-
- RaftClientRequestSender rpc = ((RaftClientImpl)client).getRequestSender();
- RaftClientReply reply = null;
- // it is possible that the remote peer's rpc server is not ready. need retry
- for (int i = 0; reply == null && i < 10; i++) {
- try {
- reply = rpc.sendRequest(
- new RaftClientRequest("client", leaderId, DEFAULT_SEQNUM,
- new SimpleMessage("m1")));
- } catch (IOException ignored) {
- Thread.sleep(1000);
- }
- }
- Assert.assertNotNull(reply);
- Assert.assertFalse(reply.isSuccess());
- Assert.assertTrue(reply.isNotLeader());
- Assert.assertEquals(newLeader,
- reply.getNotLeaderException().getSuggestedLeader().getId());
- Collection<RaftPeer> peers = cluster.getPeers();
- RaftPeer[] peersFromReply = reply.getNotLeaderException().getPeers();
- Assert.assertEquals(peers.size(), peersFromReply.length);
- for (RaftPeer p : peersFromReply) {
- Assert.assertTrue(peers.contains(p));
- }
-
- reply = client.send(new SimpleMessage("m2"));
- Assert.assertTrue(reply.isSuccess());
- client.close();
- }
-}