You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/01/31 21:17:06 UTC
[21/54] [abbrv] incubator-ratis git commit: Renamed the packages from
raft to ratis in preperation for Apache Incubation - Moved all java packages
from org.apache.raft to org.apache.ratis. - Moved native package to
org_apache_ratis, and native lib to l
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
new file mode 100644
index 0000000..7b3845a
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.Timestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+class LeaderElection extends Daemon {
+ public static final Logger LOG = LoggerFactory.getLogger(LeaderElection.class);
+
+ private ResultAndTerm logAndReturn(Result result,
+ List<RequestVoteReplyProto> responses,
+ List<Exception> exceptions, long newTerm) {
+ LOG.info(server.getId() + ": Election " + result + "; received "
+ + responses.size() + " response(s) "
+ + responses.stream().map(r -> ProtoUtils.toString(r)).collect(Collectors.toList())
+ + " and " + exceptions.size() + " exception(s); " + server.getState());
+ int i = 0;
+ for(Exception e : exceptions) {
+ LOG.info(" " + i++ + ": " + e);
+ LOG.trace("TRACE", e);
+ }
+ return new ResultAndTerm(result, newTerm);
+ }
+
+ enum Result {PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN}
+
+ private static class ResultAndTerm {
+ final Result result;
+ final long term;
+
+ ResultAndTerm(Result result, long term) {
+ this.result = result;
+ this.term = term;
+ }
+ }
+
+ private final RaftServerImpl server;
+ private ExecutorCompletionService<RequestVoteReplyProto> service;
+ private ExecutorService executor;
+ private volatile boolean running;
+ /**
+ * The Raft configuration should not change while the peer is in candidate
+ * state. If the configuration changes, another peer should be acting as a
+ * leader and this LeaderElection session should end.
+ */
+ private final RaftConfiguration conf;
+ private final Collection<RaftPeer> others;
+
+ LeaderElection(RaftServerImpl server) {
+ this.server = server;
+ conf = server.getRaftConf();
+ others = conf.getOtherPeers(server.getId());
+ this.running = true;
+ }
+
+ void stopRunning() {
+ this.running = false;
+ }
+
+ private void initExecutor() {
+ Preconditions.checkState(!others.isEmpty());
+ executor = Executors.newFixedThreadPool(others.size(),
+ new ThreadFactoryBuilder().setDaemon(true).build());
+ service = new ExecutorCompletionService<>(executor);
+ }
+
+ @Override
+ public void run() {
+ try {
+ askForVotes();
+ } catch (InterruptedException e) {
+ // the leader election thread is interrupted. The peer may already step
+ // down to a follower. The leader election should skip.
+ LOG.info(server.getId() + " " + getClass().getSimpleName()
+ + " thread is interrupted gracefully; server=" + server);
+ } catch (IOException e) {
+ LOG.warn("Failed to persist votedFor/term. Exit the leader election.", e);
+ stopRunning();
+ }
+ }
+
+ /**
+ * After a peer changes its role to candidate, it invokes this method to
+ * send out requestVote rpc to all other peers.
+ */
+ private void askForVotes() throws InterruptedException, IOException {
+ final ServerState state = server.getState();
+ while (running && server.isCandidate()) {
+ // one round of requestVotes
+ final long electionTerm;
+ synchronized (server) {
+ electionTerm = state.initElection();
+ server.getState().persistMetadata();
+ }
+ LOG.info(state.getSelfId() + ": begin an election in Term "
+ + electionTerm);
+
+ TermIndex lastEntry = ServerProtoUtils.toTermIndex(
+ state.getLog().getLastEntry());
+ if (lastEntry == null) {
+ // lastEntry may need to be derived from snapshot
+ SnapshotInfo snapshot = state.getLatestSnapshot();
+ if (snapshot != null) {
+ lastEntry = snapshot.getTermIndex();
+ }
+ }
+
+ final ResultAndTerm r;
+ if (others.isEmpty()) {
+ r = new ResultAndTerm(Result.PASSED, electionTerm);
+ } else {
+ try {
+ initExecutor();
+ int submitted = submitRequests(electionTerm, lastEntry);
+ r = waitForResults(electionTerm, submitted);
+ } finally {
+ if (executor != null) {
+ executor.shutdown();
+ }
+ }
+ }
+
+ synchronized (server) {
+ if (electionTerm != state.getCurrentTerm() || !running ||
+ !server.isCandidate()) {
+ return; // term already passed or no longer a candidate.
+ }
+
+ switch (r.result) {
+ case PASSED:
+ server.changeToLeader();
+ return;
+ case SHUTDOWN:
+ LOG.info("{} received shutdown response when requesting votes.",
+ server.getId());
+ server.close();
+ return;
+ case REJECTED:
+ case DISCOVERED_A_NEW_TERM:
+ final long term = r.term > server.getState().getCurrentTerm() ?
+ r.term : server.getState().getCurrentTerm();
+ server.changeToFollower(term, true);
+ return;
+ case TIMEOUT:
+ // should start another election
+ }
+ }
+ }
+ }
+
+ private int submitRequests(final long electionTerm, final TermIndex lastEntry) {
+ int submitted = 0;
+ for (final RaftPeer peer : others) {
+ final RequestVoteRequestProto r = server.createRequestVoteRequest(
+ peer.getId(), electionTerm, lastEntry);
+ service.submit(
+ () -> server.getServerRpc().requestVote(r));
+ submitted++;
+ }
+ return submitted;
+ }
+
+ private ResultAndTerm waitForResults(final long electionTerm,
+ final int submitted) throws InterruptedException {
+ final Timestamp timeout = new Timestamp().addTimeMs(server.getRandomTimeoutMs());
+ final List<RequestVoteReplyProto> responses = new ArrayList<>();
+ final List<Exception> exceptions = new ArrayList<>();
+ int waitForNum = submitted;
+ Collection<String> votedPeers = new ArrayList<>();
+ while (waitForNum > 0 && running && server.isCandidate()) {
+ final long waitTime = -timeout.elapsedTimeMs();
+ if (waitTime <= 0) {
+ return logAndReturn(Result.TIMEOUT, responses, exceptions, -1);
+ }
+
+ try {
+ final Future<RequestVoteReplyProto> future = service.poll(
+ waitTime, TimeUnit.MILLISECONDS);
+ if (future == null) {
+ continue; // poll timeout, continue to return Result.TIMEOUT
+ }
+
+ final RequestVoteReplyProto r = future.get();
+ responses.add(r);
+ if (r.getShouldShutdown()) {
+ return logAndReturn(Result.SHUTDOWN, responses, exceptions, -1);
+ }
+ if (r.getTerm() > electionTerm) {
+ return logAndReturn(Result.DISCOVERED_A_NEW_TERM, responses,
+ exceptions, r.getTerm());
+ }
+ if (r.getServerReply().getSuccess()) {
+ votedPeers.add(r.getServerReply().getReplyId());
+ if (conf.hasMajority(votedPeers, server.getId())) {
+ return logAndReturn(Result.PASSED, responses, exceptions, -1);
+ }
+ }
+ } catch(ExecutionException e) {
+ LOG.info("Got exception when requesting votes: " + e);
+ LOG.trace("TRACE", e);
+ exceptions.add(e);
+ }
+ waitForNum--;
+ }
+ // received all the responses
+ return logAndReturn(Result.REJECTED, responses, exceptions, -1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
new file mode 100644
index 0000000..e4b2889
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -0,0 +1,601 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_RPC_SLEEP_TIME_MS_DEFAULT;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_RPC_SLEEP_TIME_MS_KEY;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_KEY;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SNAPSHOT_CHUNK_MAX_SIZE_DEFAULT;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SNAPSHOT_CHUNK_MAX_SIZE_KEY;
+import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.STAGINGPROGRESS;
+import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.STEPDOWN;
+import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.UPDATECOMMIT;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.ReconfigurationTimeoutException;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.shaded.proto.RaftProtos.LeaderNoOp;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.Timestamp;
+import org.slf4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * States for leader only. It contains three different types of processors:
+ * 1. RPC senders: each thread is appending log to a follower
+ * 2. EventProcessor: a single thread updating the raft server's state based on
+ * status of log appending response
+ * 3. PendingRequestHandler: a handler sending back responses to clients when
+ * corresponding log entries are committed
+ */
+public class LeaderState {
+ private static final Logger LOG = RaftServerImpl.LOG;
+
+ enum StateUpdateEventType {
+ STEPDOWN, UPDATECOMMIT, STAGINGPROGRESS
+ }
+
+ enum BootStrapProgress {
+ NOPROGRESS, PROGRESSING, CAUGHTUP
+ }
+
+ static class StateUpdateEvent {
+ final StateUpdateEventType type;
+ final long newTerm;
+
+ StateUpdateEvent(StateUpdateEventType type, long newTerm) {
+ this.type = type;
+ this.newTerm = newTerm;
+ }
+ }
+
+ static final StateUpdateEvent UPDATE_COMMIT_EVENT =
+ new StateUpdateEvent(StateUpdateEventType.UPDATECOMMIT, -1);
+ static final StateUpdateEvent STAGING_PROGRESS_EVENT =
+ new StateUpdateEvent(StateUpdateEventType.STAGINGPROGRESS, -1);
+
+ private final RaftServerImpl server;
+ private final RaftLog raftLog;
+ private final long currentTerm;
+ private volatile ConfigurationStagingState stagingState;
+ private List<List<FollowerInfo>> voterLists;
+
+ /**
+ * The list of threads appending entries to followers.
+ * The list is protected by the RaftServer's lock.
+ */
+ private final List<LogAppender> senders;
+ private final BlockingQueue<StateUpdateEvent> eventQ;
+ private final EventProcessor processor;
+ private final PendingRequests pendingRequests;
+ private volatile boolean running = true;
+
+ private final int stagingCatchupGap;
+ private final int snapshotChunkMaxSize;
+ private final int syncInterval;
+
+ LeaderState(RaftServerImpl server, RaftProperties properties) {
+ this.server = server;
+
+ stagingCatchupGap = properties.getInt(
+ RAFT_SERVER_STAGING_CATCHUP_GAP_KEY,
+ RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT);
+ snapshotChunkMaxSize = properties.getInt(
+ RAFT_SNAPSHOT_CHUNK_MAX_SIZE_KEY,
+ RAFT_SNAPSHOT_CHUNK_MAX_SIZE_DEFAULT);
+ syncInterval = properties.getInt(
+ RAFT_SERVER_RPC_SLEEP_TIME_MS_KEY,
+ RAFT_SERVER_RPC_SLEEP_TIME_MS_DEFAULT);
+
+ final ServerState state = server.getState();
+ this.raftLog = state.getLog();
+ this.currentTerm = state.getCurrentTerm();
+ eventQ = new ArrayBlockingQueue<>(4096);
+ processor = new EventProcessor();
+ pendingRequests = new PendingRequests(server);
+
+ final RaftConfiguration conf = server.getRaftConf();
+ Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId());
+ final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
+ final long nextIndex = raftLog.getNextIndex();
+ senders = new ArrayList<>(others.size());
+ for (RaftPeer p : others) {
+ FollowerInfo f = new FollowerInfo(p, t, nextIndex, true);
+ senders.add(server.getLogAppenderFactory().getLogAppender(server, this, f));
+ }
+ voterLists = divideFollowers(conf);
+ }
+
+ void start() {
+ // In the beginning of the new term, replicate an empty entry in order
+ // to finally commit entries in the previous term.
+ // Also this message can help identify the last committed index when
+ // the leader peer is just started.
+ final LogEntryProto placeHolder = LogEntryProto.newBuilder()
+ .setTerm(server.getState().getCurrentTerm())
+ .setIndex(raftLog.getNextIndex())
+ .setNoOp(LeaderNoOp.newBuilder()).build();
+ raftLog.append(placeHolder);
+
+ processor.start();
+ startSenders();
+ }
+
+ private void startSenders() {
+ senders.forEach(Thread::start);
+ }
+
+ void stop() {
+ this.running = false;
+ // do not interrupt event processor since it may be in the middle of logSync
+ for (LogAppender sender : senders) {
+ sender.stopSender();
+ sender.interrupt();
+ }
+ try {
+ pendingRequests.sendNotLeaderResponses();
+ } catch (IOException e) {
+ LOG.warn("Caught exception in sendNotLeaderResponses", e);
+ }
+ }
+
+ void notifySenders() {
+ senders.forEach(LogAppender::notifyAppend);
+ }
+
+ boolean inStagingState() {
+ return stagingState != null;
+ }
+
+ ConfigurationStagingState getStagingState() {
+ return stagingState;
+ }
+
+ long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ int getSnapshotChunkMaxSize() {
+ return snapshotChunkMaxSize;
+ }
+
+ int getSyncInterval() {
+ return syncInterval;
+ }
+
+ /**
+ * Start bootstrapping new peers
+ */
+ PendingRequest startSetConfiguration(SetConfigurationRequest request) {
+ Preconditions.checkState(running && !inStagingState());
+
+ RaftPeer[] peersInNewConf = request.getPeersInNewConf();
+ Collection<RaftPeer> peersToBootStrap = RaftConfiguration
+ .computeNewPeers(peersInNewConf, server.getRaftConf());
+
+ // add the request to the pending queue
+ final PendingRequest pending = pendingRequests.addConfRequest(request);
+
+ ConfigurationStagingState stagingState = new ConfigurationStagingState(
+ peersToBootStrap, new PeerConfiguration(Arrays.asList(peersInNewConf)));
+ Collection<RaftPeer> newPeers = stagingState.getNewPeers();
+ // set the staging state
+ this.stagingState = stagingState;
+
+ if (newPeers.isEmpty()) {
+ applyOldNewConf();
+ } else {
+ // update the LeaderState's sender list
+ addSenders(newPeers);
+ }
+ return pending;
+ }
+
+ PendingRequest addPendingRequest(long index, RaftClientRequest request,
+ TransactionContext entry) {
+ return pendingRequests.addPendingRequest(index, request, entry);
+ }
+
+ private void applyOldNewConf() {
+ final ServerState state = server.getState();
+ final RaftConfiguration current = server.getRaftConf();
+ final RaftConfiguration oldNewConf= stagingState.generateOldNewConf(current,
+ state.getLog().getNextIndex());
+ // apply the (old, new) configuration to log, and use it as the current conf
+ long index = state.getLog().append(state.getCurrentTerm(), oldNewConf);
+ updateConfiguration(index, oldNewConf);
+
+ this.stagingState = null;
+ notifySenders();
+ }
+
+ private void updateConfiguration(long logIndex, RaftConfiguration newConf) {
+ voterLists = divideFollowers(newConf);
+ server.getState().setRaftConf(logIndex, newConf);
+ }
+
+ /**
+ * After receiving a setConfiguration request, the leader should update its
+ * RpcSender list.
+ */
+ void addSenders(Collection<RaftPeer> newMembers) {
+ final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
+ final long nextIndex = raftLog.getNextIndex();
+ for (RaftPeer peer : newMembers) {
+ FollowerInfo f = new FollowerInfo(peer, t, nextIndex, false);
+ LogAppender sender = server.getLogAppenderFactory()
+ .getLogAppender(server, this, f);
+ senders.add(sender);
+ sender.start();
+ }
+ }
+
+ /**
+ * Update the RpcSender list based on the current configuration
+ */
+ private void updateSenders(RaftConfiguration conf) {
+ Preconditions.checkState(conf.isStable() && !inStagingState());
+ Iterator<LogAppender> iterator = senders.iterator();
+ while (iterator.hasNext()) {
+ LogAppender sender = iterator.next();
+ if (!conf.containsInConf(sender.getFollower().getPeer().getId())) {
+ iterator.remove();
+ sender.stopSender();
+ sender.interrupt();
+ }
+ }
+ }
+
+ void submitUpdateStateEvent(StateUpdateEvent event) {
+ try {
+ eventQ.put(event);
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted when adding event {} into the queue", event);
+ }
+ }
+
+ private void prepare() {
+ synchronized (server) {
+ if (running) {
+ final RaftConfiguration conf = server.getRaftConf();
+ if (conf.isTransitional() && server.getState().isConfCommitted()) {
+ // the configuration is in transitional state, and has been committed
+ // so it is time to generate and replicate (new) conf.
+ replicateNewConf();
+ }
+ }
+ }
+ }
+
+ /**
+ * The processor thread takes the responsibility to update the raft server's
+ * state, such as changing to follower, or updating the committed index.
+ */
+ private class EventProcessor extends Daemon {
+ @Override
+ public void run() {
+ // apply an empty message; check if necessary to replicate (new) conf
+ prepare();
+
+ while (running) {
+ try {
+ StateUpdateEvent event = eventQ.poll(server.getMaxTimeoutMs(),
+ TimeUnit.MILLISECONDS);
+ synchronized (server) {
+ if (running) {
+ handleEvent(event);
+ }
+ }
+ // the updated configuration does not need to be sync'ed here
+ } catch (InterruptedException e) {
+ final String s = server.getId() + " " + getClass().getSimpleName()
+ + " thread is interrupted ";
+ if (!running) {
+ LOG.info(s + " gracefully; server=" + server);
+ } else {
+ LOG.warn(s + " UNEXPECTEDLY; server=" + server, e);
+ throw new RuntimeException(e);
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to persist new votedFor/term.", e);
+ // the failure should happen while changing the state to follower
+ // thus the in-memory state should have been updated
+ Preconditions.checkState(!running);
+ }
+ }
+ }
+ }
+
+ private void handleEvent(StateUpdateEvent e) throws IOException {
+ if (e == null) {
+ if (inStagingState()) {
+ checkNewPeers();
+ }
+ } else {
+ if (e.type == STEPDOWN) {
+ server.changeToFollower(e.newTerm, true);
+ } else if (e.type == UPDATECOMMIT) {
+ updateLastCommitted();
+ } else if (e.type == STAGINGPROGRESS) {
+ checkNewPeers();
+ }
+ }
+ }
+
+ /**
+ * So far we use a simple implementation for catchup checking:
+ * 1. If the latest rpc time of the remote peer is before 3 * max_timeout,
+ * the peer made no progress for that long. We should fail the whole
+ * setConfiguration request.
+ * 2. If the peer's matching index is just behind for a small gap, and the
+ * peer was updated recently (within max_timeout), declare the peer as
+ * caught-up.
+ * 3. Otherwise the peer is making progressing. Keep waiting.
+ */
+ private BootStrapProgress checkProgress(FollowerInfo follower,
+ long committed) {
+ Preconditions.checkArgument(!follower.isAttendingVote());
+ final Timestamp progressTime = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
+ final Timestamp timeoutTime = new Timestamp().addTimeMs(-3*server.getMaxTimeoutMs());
+ if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) {
+ LOG.debug("{} detects a follower {} timeout for bootstrapping," +
+ " timeoutTime: {}", server.getId(), follower, timeoutTime);
+ return BootStrapProgress.NOPROGRESS;
+ } else if (follower.getMatchIndex() + stagingCatchupGap > committed
+ && follower.getLastRpcResponseTime().compareTo(progressTime) > 0) {
+ return BootStrapProgress.CAUGHTUP;
+ } else {
+ return BootStrapProgress.PROGRESSING;
+ }
+ }
+
+ private Collection<BootStrapProgress> checkAllProgress(long committed) {
+ Preconditions.checkState(inStagingState());
+ return senders.stream()
+ .filter(sender -> !sender.getFollower().isAttendingVote())
+ .map(sender -> checkProgress(sender.getFollower(), committed))
+ .collect(Collectors.toCollection(ArrayList::new));
+ }
+
+ private void checkNewPeers() {
+ if (!inStagingState()) {
+ // it is possible that the bootstrapping is done and we still have
+ // remaining STAGINGPROGRESS event to handle.
+ updateLastCommitted();
+ } else {
+ final long committedIndex = server.getState().getLog()
+ .getLastCommittedIndex();
+ Collection<BootStrapProgress> reports = checkAllProgress(committedIndex);
+ if (reports.contains(BootStrapProgress.NOPROGRESS)) {
+ LOG.debug("{} fails the setConfiguration request", server.getId());
+ stagingState.fail();
+ } else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
+ // all caught up!
+ applyOldNewConf();
+ for (LogAppender sender : senders) {
+ sender.getFollower().startAttendVote();
+ }
+ }
+ }
+ }
+
+ boolean isBootStrappingPeer(String peerId) {
+ return inStagingState() && getStagingState().contains(peerId);
+ }
+
+ private void updateLastCommitted() {
+ final String selfId = server.getId();
+ final RaftConfiguration conf = server.getRaftConf();
+ long majorityInNewConf = computeLastCommitted(voterLists.get(0),
+ conf.containsInConf(selfId));
+ final long oldLastCommitted = raftLog.getLastCommittedIndex();
+ final LogEntryProto[] entriesToCommit;
+ if (!conf.isTransitional()) {
+ // copy the entries that may get committed out of the raftlog, to prevent
+ // the possible race that the log gets purged after the statemachine does
+ // a snapshot
+ entriesToCommit = raftLog.getEntries(oldLastCommitted + 1,
+ Math.max(majorityInNewConf, oldLastCommitted) + 1);
+ server.getState().updateStatemachine(majorityInNewConf, currentTerm);
+ } else { // configuration is in transitional state
+ long majorityInOldConf = computeLastCommitted(voterLists.get(1),
+ conf.containsInOldConf(selfId));
+ final long majority = Math.min(majorityInNewConf, majorityInOldConf);
+ entriesToCommit = raftLog.getEntries(oldLastCommitted + 1,
+ Math.max(majority, oldLastCommitted) + 1);
+ server.getState().updateStatemachine(majority, currentTerm);
+ }
+ checkAndUpdateConfiguration(entriesToCommit);
+ }
+
+ private boolean committedConf(LogEntryProto[] entries) {
+ final long currentCommitted = raftLog.getLastCommittedIndex();
+ for (LogEntryProto entry : entries) {
+ if (entry.getIndex() <= currentCommitted &&
+ ProtoUtils.isConfigurationLogEntry(entry)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void checkAndUpdateConfiguration(LogEntryProto[] entriesToCheck) {
+ final RaftConfiguration conf = server.getRaftConf();
+ if (committedConf(entriesToCheck)) {
+ if (conf.isTransitional()) {
+ replicateNewConf();
+ } else { // the (new) log entry has been committed
+ LOG.debug("{} sends success to setConfiguration request", server.getId());
+ pendingRequests.replySetConfiguration();
+ // if the leader is not included in the current configuration, step down
+ if (!conf.containsInConf(server.getId())) {
+ LOG.info("{} is not included in the new configuration {}. Step down.",
+ server.getId(), conf);
+ try {
+ // leave some time for all RPC senders to send out new conf entry
+ Thread.sleep(server.getMinTimeoutMs());
+ } catch (InterruptedException ignored) {
+ }
+ // the pending request handler will send NotLeaderException for
+ // pending client requests when it stops
+ server.close();
+ }
+ }
+ }
+ }
+
+ /**
+ * when the (old, new) log entry has been committed, should replicate (new):
+ * 1) append (new) to log
+ * 2) update conf to (new)
+ * 3) update RpcSenders list
+ * 4) start replicating the log entry
+ */
+ private void replicateNewConf() {
+ final RaftConfiguration conf = server.getRaftConf();
+ final RaftConfiguration newConf = RaftConfiguration.newBuilder()
+ .setConf(conf)
+ .setLogEntryIndex(raftLog.getNextIndex())
+ .build();
+ // stop the LogAppender if the corresponding follower is no longer in the conf
+ updateSenders(newConf);
+ long index = raftLog.append(server.getState().getCurrentTerm(), newConf);
+ updateConfiguration(index, newConf);
+ notifySenders();
+ }
+
+ private long computeLastCommitted(List<FollowerInfo> followers,
+ boolean includeSelf) {
+ final int length = includeSelf ? followers.size() + 1 : followers.size();
+ final long[] indices = new long[length];
+ for (int i = 0; i < followers.size(); i++) {
+ indices[i] = followers.get(i).getMatchIndex();
+ }
+ if (includeSelf) {
+ // note that we also need to wait for the local disk I/O
+ indices[length - 1] = raftLog.getLatestFlushedIndex();
+ }
+
+ Arrays.sort(indices);
+ return indices[(indices.length - 1) / 2];
+ }
+
+ private List<List<FollowerInfo>> divideFollowers(RaftConfiguration conf) {
+ List<List<FollowerInfo>> lists = new ArrayList<>(2);
+ List<FollowerInfo> listForNew = senders.stream()
+ .filter(sender -> conf.containsInConf(sender.getFollower().getPeer().getId()))
+ .map(LogAppender::getFollower)
+ .collect(Collectors.toList());
+ lists.add(listForNew);
+ if (conf.isTransitional()) {
+ List<FollowerInfo> listForOld = senders.stream()
+ .filter(sender -> conf.containsInOldConf(sender.getFollower().getPeer().getId()))
+ .map(LogAppender::getFollower)
+ .collect(Collectors.toList());
+ lists.add(listForOld);
+ }
+ return lists;
+ }
+
+ PendingRequest returnNoConfChange(SetConfigurationRequest r) {
+ PendingRequest pending = new PendingRequest(r);
+ pending.setSuccessReply(null);
+ return pending;
+ }
+
+ void replyPendingRequest(long logIndex, CompletableFuture<Message> message) {
+ pendingRequests.replyPendingRequest(logIndex, message);
+ }
+
+ TransactionContext getTransactionContext(long index) {
+ return pendingRequests.getTransactionContext(index);
+ }
+
+ private class ConfigurationStagingState {
+ private final Map<String, RaftPeer> newPeers;
+ private final PeerConfiguration newConf;
+
+ ConfigurationStagingState(Collection<RaftPeer> newPeers,
+ PeerConfiguration newConf) {
+ Map<String, RaftPeer> map = new HashMap<>();
+ for (RaftPeer peer : newPeers) {
+ map.put(peer.getId(), peer);
+ }
+ this.newPeers = Collections.unmodifiableMap(map);
+ this.newConf = newConf;
+ }
+
+ RaftConfiguration generateOldNewConf(RaftConfiguration current,
+ long logIndex) {
+ return RaftConfiguration.newBuilder()
+ .setConf(newConf)
+ .setOldConf(current)
+ .setLogEntryIndex(logIndex)
+ .build();
+ }
+
+ Collection<RaftPeer> getNewPeers() {
+ return newPeers.values();
+ }
+
+ boolean contains(String peerId) {
+ return newPeers.containsKey(peerId);
+ }
+
+ void fail() {
+ Iterator<LogAppender> iterator = senders.iterator();
+ while (iterator.hasNext()) {
+ LogAppender sender = iterator.next();
+ if (!sender.getFollower().isAttendingVote()) {
+ iterator.remove();
+ sender.stopSender();
+ sender.interrupt();
+ }
+ }
+ LeaderState.this.stagingState = null;
+ // send back failure response to client's request
+ pendingRequests.failSetConfiguration(
+ new ReconfigurationTimeoutException("Fail to set configuration "
+ + newConf + ". Timeout when bootstrapping new peers."));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
new file mode 100644
index 0000000..5599699
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -0,0 +1,494 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_DEFAULT;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_KEY;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_DEFAULT;
+import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_KEY;
+import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.ratis.server.impl.LeaderState.StateUpdateEventType;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.Timestamp;
+import org.slf4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * A daemon thread appending log entries to a follower peer.
+ */
+public class LogAppender extends Daemon {
+ public static final Logger LOG = RaftServerImpl.LOG;
+
+ protected final RaftServerImpl server;
+ private final LeaderState leaderState;
+ protected final RaftLog raftLog;
+ protected final FollowerInfo follower;
+ private final int maxBufferSize;
+ private final boolean batchSending;
+ private final LogEntryBuffer buffer;
+ private final long leaderTerm;
+
+ private volatile boolean sending = true;
+
+ public LogAppender(RaftServerImpl server, LeaderState leaderState, FollowerInfo f) {
+ this.follower = f;
+ this.server = server;
+ this.leaderState = leaderState;
+ this.raftLog = server.getState().getLog();
+ this.maxBufferSize = server.getProperties().getInt(
+ RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_KEY,
+ RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_DEFAULT);
+ this.batchSending = server.getProperties().getBoolean(
+ RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_KEY,
+ RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_DEFAULT);
+ this.buffer = new LogEntryBuffer();
+ this.leaderTerm = server.getState().getCurrentTerm();
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "(" + server.getId() + " -> " +
+ follower.getPeer().getId() + ")";
+ }
+
+ @Override
+ public void run() {
+ try {
+ checkAndSendAppendEntries();
+ } catch (InterruptedException | InterruptedIOException e) {
+ LOG.info(this + " was interrupted: " + e);
+ }
+ }
+
+ protected boolean isAppenderRunning() {
+ return sending;
+ }
+
+ public void stopSender() {
+ this.sending = false;
+ }
+
+ public FollowerInfo getFollower() {
+ return follower;
+ }
+
+ /**
+ * A buffer for log entries with size limitation.
+ */
+ private class LogEntryBuffer {
+ private final List<LogEntryProto> buf = new ArrayList<>();
+ private int totalSize = 0;
+
+ void addEntry(LogEntryProto entry) {
+ buf.add(entry);
+ totalSize += entry.getSerializedSize();
+ }
+
+ boolean isFull() {
+ return totalSize >= maxBufferSize;
+ }
+
+ boolean isEmpty() {
+ return buf.isEmpty();
+ }
+
+ AppendEntriesRequestProto getAppendRequest(TermIndex previous) {
+ final AppendEntriesRequestProto request = server
+ .createAppendEntriesRequest(leaderTerm, follower.getPeer().getId(),
+ previous, buf, !follower.isAttendingVote());
+ buf.clear();
+ totalSize = 0;
+ return request;
+ }
+
+ int getPendingEntryNum() {
+ return buf.size();
+ }
+ }
+
+ private TermIndex getPrevious() {
+ TermIndex previous = ServerProtoUtils.toTermIndex(
+ raftLog.get(follower.getNextIndex() - 1));
+ if (previous == null) {
+ // if previous is null, nextIndex must be equal to the log start
+ // index (otherwise we will install snapshot).
+ Preconditions.checkState(follower.getNextIndex() == raftLog.getStartIndex(),
+ "follower's next index %s, local log start index %s",
+ follower.getNextIndex(), raftLog.getStartIndex());
+ SnapshotInfo snapshot = server.getState().getLatestSnapshot();
+ previous = snapshot == null ? null : snapshot.getTermIndex();
+ }
+ return previous;
+ }
+
+ protected AppendEntriesRequestProto createRequest() {
+ final TermIndex previous = getPrevious();
+ final long leaderNext = raftLog.getNextIndex();
+ long next = follower.getNextIndex() + buffer.getPendingEntryNum();
+ boolean toSend = false;
+
+ if (leaderNext == next && !buffer.isEmpty()) {
+ // no new entries, then send out the entries in the buffer
+ toSend = true;
+ } else if (leaderNext > next) {
+ while (leaderNext > next && !buffer.isFull()) {
+ // stop adding entry once the buffer size is >= the max size
+ buffer.addEntry(raftLog.get(next++));
+ }
+ if (buffer.isFull() || !batchSending) {
+ // buffer is full or batch sending is disabled, send out a request
+ toSend = true;
+ }
+ }
+
+ if (toSend || shouldHeartbeat()) {
+ return buffer.getAppendRequest(previous);
+ }
+ return null;
+ }
+
+ /** Send an appendEntries RPC; retry indefinitely. */
+ private AppendEntriesReplyProto sendAppendEntriesWithRetries()
+ throws InterruptedException, InterruptedIOException {
+ int retry = 0;
+ AppendEntriesRequestProto request = null;
+ while (isAppenderRunning()) { // keep retrying for IOException
+ try {
+ if (request == null || request.getEntriesCount() == 0) {
+ request = createRequest();
+ }
+
+ if (request == null) {
+ LOG.trace("{} need not send AppendEntries now." +
+ " Wait for more entries.", server.getId());
+ return null;
+ } else if (!isAppenderRunning()) {
+ LOG.debug("LogAppender {} has been stopped. Skip the request.", this);
+ return null;
+ }
+
+ follower.updateLastRpcSendTime();
+ final AppendEntriesReplyProto r = server.getServerRpc()
+ .appendEntries(request);
+ follower.updateLastRpcResponseTime();
+
+ return r;
+ } catch (InterruptedIOException iioe) {
+ throw iioe;
+ } catch (IOException ioe) {
+ LOG.debug(this + ": failed to send appendEntries; retry " + retry++, ioe);
+ }
+ if (isAppenderRunning()) {
+ Thread.sleep(leaderState.getSyncInterval());
+ }
+ }
+ return null;
+ }
+
+ protected class SnapshotRequestIter
+ implements Iterable<InstallSnapshotRequestProto> {
+ private final SnapshotInfo snapshot;
+ private final List<FileInfo> files;
+ private FileInputStream in;
+ private int fileIndex = 0;
+
+ private FileInfo currentFileInfo;
+ private byte[] currentBuf;
+ private long currentFileSize;
+ private long currentOffset = 0;
+ private int chunkIndex = 0;
+
+ private final String requestId;
+ private int requestIndex = 0;
+
+ public SnapshotRequestIter(SnapshotInfo snapshot, String requestId)
+ throws IOException {
+ this.snapshot = snapshot;
+ this.requestId = requestId;
+ this.files = snapshot.getFiles();
+ if (files.size() > 0) {
+ startReadFile();
+ }
+ }
+
+ private void startReadFile() throws IOException {
+ currentFileInfo = files.get(fileIndex);
+ File snapshotFile = currentFileInfo.getPath().toFile();
+ currentFileSize = snapshotFile.length();
+ final int bufLength =
+ (int) Math.min(leaderState.getSnapshotChunkMaxSize(), currentFileSize);
+ currentBuf = new byte[bufLength];
+ currentOffset = 0;
+ chunkIndex = 0;
+ in = new FileInputStream(snapshotFile);
+ }
+
+ @Override
+ public Iterator<InstallSnapshotRequestProto> iterator() {
+ return new Iterator<InstallSnapshotRequestProto>() {
+ @Override
+ public boolean hasNext() {
+ return fileIndex < files.size();
+ }
+
+ @Override
+ public InstallSnapshotRequestProto next() {
+ if (fileIndex >= files.size()) {
+ throw new NoSuchElementException();
+ }
+ int targetLength = (int) Math.min(currentFileSize - currentOffset,
+ leaderState.getSnapshotChunkMaxSize());
+ FileChunkProto chunk;
+ try {
+ chunk = readFileChunk(currentFileInfo, in, currentBuf,
+ targetLength, currentOffset, chunkIndex);
+ boolean done = (fileIndex == files.size() - 1) &&
+ chunk.getDone();
+ InstallSnapshotRequestProto request =
+ server.createInstallSnapshotRequest(follower.getPeer().getId(),
+ requestId, requestIndex++, snapshot,
+ Lists.newArrayList(chunk), done);
+ currentOffset += targetLength;
+ chunkIndex++;
+
+ if (currentOffset >= currentFileSize) {
+ in.close();
+ fileIndex++;
+ if (fileIndex < files.size()) {
+ startReadFile();
+ }
+ }
+
+ return request;
+ } catch (IOException e) {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException ignored) {
+ }
+ }
+ LOG.warn("Got exception when preparing InstallSnapshot request", e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ }
+
+ private FileChunkProto readFileChunk(FileInfo fileInfo,
+ FileInputStream in, byte[] buf, int length, long offset, int chunkIndex)
+ throws IOException {
+ FileChunkProto.Builder builder = FileChunkProto.newBuilder()
+ .setOffset(offset).setChunkIndex(chunkIndex);
+ IOUtils.readFully(in, buf, 0, length);
+ Path relativePath = server.getState().getStorage().getStorageDir()
+ .relativizeToRoot(fileInfo.getPath());
+ builder.setFilename(relativePath.toString());
+ builder.setDone(offset + length == fileInfo.getFileSize());
+ builder.setFileDigest(
+ ByteString.copyFrom(fileInfo.getFileDigest().getDigest()));
+ builder.setData(ByteString.copyFrom(buf, 0, length));
+ return builder.build();
+ }
+
+ private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot)
+ throws InterruptedException, InterruptedIOException {
+ String requestId = UUID.randomUUID().toString();
+ InstallSnapshotReplyProto reply = null;
+ try {
+ for (InstallSnapshotRequestProto request :
+ new SnapshotRequestIter(snapshot, requestId)) {
+ follower.updateLastRpcSendTime();
+ reply = server.getServerRpc().installSnapshot(request);
+ follower.updateLastRpcResponseTime();
+
+ if (!reply.getServerReply().getSuccess()) {
+ return reply;
+ }
+ }
+ } catch (InterruptedIOException iioe) {
+ throw iioe;
+ } catch (Exception ioe) {
+ LOG.warn(this + ": failed to install SnapshotInfo " + snapshot.getFiles(),
+ ioe);
+ return null;
+ }
+
+ if (reply != null) {
+ follower.updateMatchIndex(snapshot.getTermIndex().getIndex());
+ follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1);
+ LOG.info("{}: install snapshot-{} successfully on follower {}",
+ server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer());
+ }
+ return reply;
+ }
+
+ protected SnapshotInfo shouldInstallSnapshot() {
+ final long logStartIndex = raftLog.getStartIndex();
+ // we should install snapshot if the follower needs to catch up and:
+ // 1. there is no local log entry but there is snapshot
+ // 2. or the follower's next index is smaller than the log start index
+ if (follower.getNextIndex() < raftLog.getNextIndex()) {
+ SnapshotInfo snapshot = server.getState().getLatestSnapshot();
+ if (follower.getNextIndex() < logStartIndex ||
+ (logStartIndex == INVALID_LOG_INDEX && snapshot != null)) {
+ return snapshot;
+ }
+ }
+ return null;
+ }
+
+ /** Check and send appendEntries RPC */
+ private void checkAndSendAppendEntries()
+ throws InterruptedException, InterruptedIOException {
+ while (isAppenderRunning()) {
+ if (shouldSendRequest()) {
+ SnapshotInfo snapshot = shouldInstallSnapshot();
+ if (snapshot != null) {
+ LOG.info("{}: follower {}'s next index is {}," +
+ " log's start index is {}, need to install snapshot",
+ server.getId(), follower.getPeer(), follower.getNextIndex(),
+ raftLog.getStartIndex());
+
+ final InstallSnapshotReplyProto r = installSnapshot(snapshot);
+ if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
+ checkResponseTerm(r.getTerm());
+ } // otherwise if r is null, retry the snapshot installation
+ } else {
+ final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
+ if (r != null) {
+ handleReply(r);
+ }
+ }
+ }
+ if (isAppenderRunning() && !shouldAppendEntries(
+ follower.getNextIndex() + buffer.getPendingEntryNum())) {
+ final long waitTime = getHeartbeatRemainingTime(
+ follower.getLastRpcTime());
+ if (waitTime > 0) {
+ synchronized (this) {
+ wait(waitTime);
+ }
+ }
+ }
+ }
+ }
+
+ private void handleReply(AppendEntriesReplyProto reply) {
+ if (reply != null) {
+ switch (reply.getResult()) {
+ case SUCCESS:
+ final long oldNextIndex = follower.getNextIndex();
+ final long nextIndex = reply.getNextIndex();
+ if (nextIndex < oldNextIndex) {
+ throw new IllegalStateException("nextIndex=" + nextIndex
+ + " < oldNextIndex=" + oldNextIndex
+ + ", reply=" + ProtoUtils.toString(reply));
+ }
+
+ if (nextIndex > oldNextIndex) {
+ follower.updateMatchIndex(nextIndex - 1);
+ follower.updateNextIndex(nextIndex);
+ submitEventOnSuccessAppend();
+ }
+ break;
+ case NOT_LEADER:
+ // check if should step down
+ checkResponseTerm(reply.getTerm());
+ break;
+ case INCONSISTENCY:
+ follower.decreaseNextIndex(reply.getNextIndex());
+ break;
+ case UNRECOGNIZED:
+ LOG.warn("{} received UNRECOGNIZED AppendResult from {}",
+ server.getId(), follower.getPeer().getId());
+ break;
+ }
+ }
+ }
+
+ protected void submitEventOnSuccessAppend() {
+ LeaderState.StateUpdateEvent e = follower.isAttendingVote() ?
+ LeaderState.UPDATE_COMMIT_EVENT :
+ LeaderState.STAGING_PROGRESS_EVENT;
+ leaderState.submitUpdateStateEvent(e);
+ }
+
+ public synchronized void notifyAppend() {
+ this.notify();
+ }
+
+ /** Should the leader send appendEntries RPC to this follower? */
+ protected boolean shouldSendRequest() {
+ return shouldAppendEntries(follower.getNextIndex()) || shouldHeartbeat();
+ }
+
+ private boolean shouldAppendEntries(long followerIndex) {
+ return followerIndex < raftLog.getNextIndex();
+ }
+
+ private boolean shouldHeartbeat() {
+ return getHeartbeatRemainingTime(follower.getLastRpcTime()) <= 0;
+ }
+
+ /**
+ * @return the time in milliseconds that the leader should send a heartbeat.
+ */
+ protected long getHeartbeatRemainingTime(Timestamp lastTime) {
+ return server.getMinTimeoutMs() / 2 - lastTime.elapsedTimeMs();
+ }
+
+ protected void checkResponseTerm(long responseTerm) {
+ synchronized (server) {
+ if (isAppenderRunning() && follower.isAttendingVote()
+ && responseTerm > leaderState.getCurrentTerm()) {
+ leaderState.submitUpdateStateEvent(
+ new LeaderState.StateUpdateEvent(StateUpdateEventType.STEPDOWN,
+ responseTerm));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java
new file mode 100644
index 0000000..e6cc213
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+public interface LogAppenderFactory {
+ LogAppender getLogAppender(RaftServerImpl server, LeaderState state,
+ FollowerInfo f);
+
+ class SynchronousLogAppenderFactory implements LogAppenderFactory {
+ @Override
+ public LogAppender getLogAppender(RaftServerImpl server, LeaderState state,
+ FollowerInfo f) {
+ return new LogAppender(server, state, f);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
new file mode 100644
index 0000000..b532303
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import com.google.common.base.Preconditions;
+
+import java.util.*;
+
+import org.apache.ratis.protocol.RaftPeer;
+
+/**
+ * The peer configuration of a raft cluster.
+ *
+ * The objects of this class are immutable.
+ */
+class PeerConfiguration {
+ private final Map<String, RaftPeer> peers;
+
+ PeerConfiguration(Iterable<RaftPeer> peers) {
+ Preconditions.checkNotNull(peers);
+ Map<String, RaftPeer> map = new HashMap<>();
+ for(RaftPeer p : peers) {
+ map.put(p.getId(), p);
+ }
+ this.peers = Collections.unmodifiableMap(map);
+ Preconditions.checkState(!this.peers.isEmpty());
+ }
+
+ Collection<RaftPeer> getPeers() {
+ return Collections.unmodifiableCollection(peers.values());
+ }
+
+ int size() {
+ return peers.size();
+ }
+
+ @Override
+ public String toString() {
+ return peers.values().toString();
+ }
+
+ RaftPeer getPeer(String id) {
+ return peers.get(id);
+ }
+
+ boolean contains(String id) {
+ return peers.containsKey(id);
+ }
+
+ List<RaftPeer> getOtherPeers(String selfId) {
+ List<RaftPeer> others = new ArrayList<>();
+ for (Map.Entry<String, RaftPeer> entry : peers.entrySet()) {
+ if (!selfId.equals(entry.getValue().getId())) {
+ others.add(entry.getValue());
+ }
+ }
+ return others;
+ }
+
+ boolean hasMajority(Collection<String> others, String selfId) {
+ Preconditions.checkArgument(!others.contains(selfId));
+ int num = 0;
+ if (contains(selfId)) {
+ num++;
+ }
+ for (String other : others) {
+ if (contains(other)) {
+ num++;
+ }
+ if (num > size() / 2) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
new file mode 100644
index 0000000..bf47cdc
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.statemachine.TransactionContext;
+
+import java.util.concurrent.CompletableFuture;
+
+public class PendingRequest implements Comparable<PendingRequest> {
+ private final Long index;
+ private final RaftClientRequest request;
+ private final TransactionContext entry;
+ private final CompletableFuture<RaftClientReply> future;
+
+ PendingRequest(long index, RaftClientRequest request,
+ TransactionContext entry) {
+ this.index = index;
+ this.request = request;
+ this.entry = entry;
+ this.future = new CompletableFuture<>();
+ }
+
+ PendingRequest(SetConfigurationRequest request) {
+ this(RaftServerConstants.INVALID_LOG_INDEX, request, null);
+ }
+
+ long getIndex() {
+ return index;
+ }
+
+ RaftClientRequest getRequest() {
+ return request;
+ }
+
+ public CompletableFuture<RaftClientReply> getFuture() {
+ return future;
+ }
+
+ TransactionContext getEntry() {
+ return entry;
+ }
+
+ synchronized void setException(Throwable e) {
+ Preconditions.checkArgument(e != null);
+ future.completeExceptionally(e);
+ }
+
+ synchronized void setReply(RaftClientReply r) {
+ Preconditions.checkArgument(r != null);
+ future.complete(r);
+ }
+
+ void setSuccessReply(Message message) {
+ setReply(new RaftClientReply(getRequest(), message));
+ }
+
+ @Override
+ public int compareTo(PendingRequest that) {
+ return Long.compare(this.index, that.index);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "(index=" + index
+ + ", request=" + request;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
new file mode 100644
index 0000000..6343344
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+class PendingRequests {
+ private static final Logger LOG = RaftServerImpl.LOG;
+
+ private PendingRequest pendingSetConf;
+ private final RaftServerImpl server;
+ private final ConcurrentMap<Long, PendingRequest> pendingRequests = new ConcurrentHashMap<>();
+ private PendingRequest last = null;
+
+ PendingRequests(RaftServerImpl server) {
+ this.server = server;
+ }
+
+ PendingRequest addPendingRequest(long index, RaftClientRequest request,
+ TransactionContext entry) {
+ // externally synced for now
+ Preconditions.checkArgument(!request.isReadOnly());
+ Preconditions.checkState(last == null || index == last.getIndex() + 1);
+ return add(index, request, entry);
+ }
+
+ private PendingRequest add(long index, RaftClientRequest request,
+ TransactionContext entry) {
+ final PendingRequest pending = new PendingRequest(index, request, entry);
+ pendingRequests.put(index, pending);
+ last = pending;
+ return pending;
+ }
+
+ PendingRequest addConfRequest(SetConfigurationRequest request) {
+ Preconditions.checkState(pendingSetConf == null);
+ pendingSetConf = new PendingRequest(request);
+ return pendingSetConf;
+ }
+
+ void replySetConfiguration() {
+ // we allow the pendingRequest to be null in case that the new leader
+ // commits the new configuration while it has not received the retry
+ // request from the client
+ if (pendingSetConf != null) {
+ // for setConfiguration we do not need to wait for statemachine. send back
+ // reply after it's committed.
+ pendingSetConf.setSuccessReply(null);
+ pendingSetConf = null;
+ }
+ }
+
+ void failSetConfiguration(RaftException e) {
+ Preconditions.checkState(pendingSetConf != null);
+ pendingSetConf.setException(e);
+ pendingSetConf = null;
+ }
+
+ TransactionContext getTransactionContext(long index) {
+ PendingRequest pendingRequest = pendingRequests.get(index);
+ // it is possible that the pendingRequest is null if this peer just becomes
+ // the new leader and commits transactions received by the previous leader
+ return pendingRequest != null ? pendingRequest.getEntry() : null;
+ }
+
+ void replyPendingRequest(long index, CompletableFuture<Message> messageFuture) {
+ final PendingRequest pending = pendingRequests.get(index);
+ if (pending != null) {
+ Preconditions.checkState(pending.getIndex() == index);
+
+ messageFuture.whenComplete((reply, exception) -> {
+ if (exception == null) {
+ pending.setSuccessReply(reply);
+ } else {
+ pending.setException(exception);
+ }
+ });
+ }
+ }
+
+ /**
+ * The leader state is stopped. Send NotLeaderException to all the pending
+ * requests since they have not got applied to the state machine yet.
+ */
+ void sendNotLeaderResponses() throws IOException {
+ LOG.info("{} sends responses before shutting down PendingRequestsHandler",
+ server.getId());
+
+ Collection<TransactionContext> pendingEntries = pendingRequests.values().stream()
+ .map(PendingRequest::getEntry).collect(Collectors.toList());
+ // notify the state machine about stepping down
+ server.getStateMachine().notifyNotLeader(pendingEntries);
+ pendingRequests.values().forEach(this::setNotLeaderException);
+ if (pendingSetConf != null) {
+ setNotLeaderException(pendingSetConf);
+ }
+ }
+
+ private void setNotLeaderException(PendingRequest pending) {
+ RaftClientReply reply = new RaftClientReply(pending.getRequest(),
+ server.generateNotLeaderException());
+ pending.setReply(reply);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
new file mode 100644
index 0000000..8fdd628
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.ratis.protocol.RaftPeer;
+
+/**
+ * The configuration of the raft cluster.
+ *
+ * The configuration is stable if there is no on-going peer change. Otherwise,
+ * the configuration is transitional, i.e. in the middle of a peer change.
+ *
+ * The objects of this class are immutable.
+ */
+public class RaftConfiguration {
+ /** Create a {@link Builder}. */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /** To build {@link RaftConfiguration} objects. */
+ public static class Builder {
+ private PeerConfiguration oldConf;
+ private PeerConfiguration conf;
+ private long logEntryIndex = RaftServerConstants.INVALID_LOG_INDEX;
+
+ private boolean forceStable = false;
+ private boolean forceTransitional = false;
+
+ private Builder() {}
+
+ public Builder setConf(PeerConfiguration conf) {
+ Preconditions.checkNotNull(conf);
+ Preconditions.checkState(this.conf == null, "conf is already set.");
+ this.conf = conf;
+ return this;
+ }
+
+ public Builder setConf(Iterable<RaftPeer> peers) {
+ return setConf(new PeerConfiguration(peers));
+ }
+
+ public Builder setConf(RaftPeer[] peers) {
+ return setConf(Arrays.asList(peers));
+ }
+
+ Builder setConf(RaftConfiguration transitionalConf) {
+ Preconditions.checkNotNull(transitionalConf);
+ Preconditions.checkState(transitionalConf.isTransitional());
+
+ Preconditions.checkState(!forceTransitional);
+ forceStable = true;
+ return setConf(transitionalConf.conf);
+ }
+
+
+ public Builder setOldConf(PeerConfiguration oldConf) {
+ Preconditions.checkNotNull(oldConf);
+ Preconditions.checkState(this.oldConf == null, "oldConf is already set.");
+ this.oldConf = oldConf;
+ return this;
+ }
+
+ public Builder setOldConf(Iterable<RaftPeer> oldPeers) {
+ return setOldConf(new PeerConfiguration(oldPeers));
+ }
+
+ public Builder setOldConf(RaftPeer[] oldPeers) {
+ return setOldConf(Arrays.asList(oldPeers));
+ }
+
+ Builder setOldConf(RaftConfiguration stableConf) {
+ Preconditions.checkNotNull(stableConf);
+ Preconditions.checkState(stableConf.isStable());
+
+ Preconditions.checkState(!forceStable);
+ forceTransitional = true;
+ return setOldConf(stableConf.conf);
+ }
+
+ public Builder setLogEntryIndex(long logEntryIndex) {
+ Preconditions.checkArgument(
+ logEntryIndex != RaftServerConstants.INVALID_LOG_INDEX);
+ Preconditions.checkState(
+ this.logEntryIndex == RaftServerConstants.INVALID_LOG_INDEX,
+ "logEntryIndex is already set.");
+ this.logEntryIndex = logEntryIndex;
+ return this;
+ }
+
+ /** Build a {@link RaftConfiguration}. */
+ public RaftConfiguration build() {
+ if (forceTransitional) {
+ Preconditions.checkState(oldConf != null);
+ }
+ if (forceStable) {
+ Preconditions.checkState(oldConf == null);
+ }
+ return new RaftConfiguration(conf, oldConf, logEntryIndex);
+ }
+ }
+
+ /** Non-null only if this configuration is transitional. */
+ private final PeerConfiguration oldConf;
+ /**
+ * The current peer configuration while this configuration is stable;
+ * or the new peer configuration while this configuration is transitional.
+ */
+ private final PeerConfiguration conf;
+
+ /** The index of the corresponding log entry for this configuration. */
+ private final long logEntryIndex;
+
+ private RaftConfiguration(PeerConfiguration conf, PeerConfiguration oldConf,
+ long logEntryIndex) {
+ Preconditions.checkNotNull(conf);
+ this.conf = conf;
+ this.oldConf = oldConf;
+ this.logEntryIndex = logEntryIndex;
+ }
+
+ /** Is this configuration transitional, i.e. in the middle of a peer change? */
+ boolean isTransitional() {
+ return oldConf != null;
+ }
+
+ /** Is this configuration stable, i.e. no on-going peer change? */
+ boolean isStable() {
+ return oldConf == null;
+ }
+
+ boolean containsInConf(String peerId) {
+ return conf.contains(peerId);
+ }
+
+ boolean containsInOldConf(String peerId) {
+ return oldConf != null && oldConf.contains(peerId);
+ }
+
+ boolean contains(String peerId) {
+ return containsInConf(peerId) && (oldConf == null || containsInOldConf(peerId));
+ }
+
+ /**
+ * @return the peer corresponding to the given id;
+ * or return null if the peer is not in this configuration.
+ */
+ public RaftPeer getPeer(String id) {
+ if (id == null) {
+ return null;
+ }
+ RaftPeer peer = conf.getPeer(id);
+ if (peer != null) {
+ return peer;
+ } else if (oldConf != null) {
+ return oldConf.getPeer(id);
+ }
+ return null;
+ }
+
+ /** @return all the peers from the conf, and the old conf if it exists. */
+ public Collection<RaftPeer> getPeers() {
+ final Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers());
+ if (oldConf != null) {
+ oldConf.getPeers().stream().filter(p -> !peers.contains(p))
+ .forEach(peers::add);
+ }
+ return peers;
+ }
+
+ /**
+ * @return all the peers other than the given self id from the conf,
+ * and the old conf if it exists.
+ */
+ public Collection<RaftPeer> getOtherPeers(String selfId) {
+ Collection<RaftPeer> others = conf.getOtherPeers(selfId);
+ if (oldConf != null) {
+ oldConf.getOtherPeers(selfId).stream()
+ .filter(p -> !others.contains(p))
+ .forEach(others::add);
+ }
+ return others;
+ }
+
+ /** @return true if the self id together with the others are in the majority. */
+ boolean hasMajority(Collection<String> others, String selfId) {
+ Preconditions.checkArgument(!others.contains(selfId));
+ return conf.hasMajority(others, selfId) &&
+ (oldConf == null || oldConf.hasMajority(others, selfId));
+ }
+
+ @Override
+ public String toString() {
+ return conf + (oldConf != null ? "old:" + oldConf : "");
+ }
+
+ @VisibleForTesting
+ boolean hasNoChange(RaftPeer[] newMembers) {
+ if (!isStable() || conf.size() != newMembers.length) {
+ return false;
+ }
+ for (RaftPeer peer : newMembers) {
+ if (!conf.contains(peer.getId())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ long getLogEntryIndex() {
+ return logEntryIndex;
+ }
+
+ static Collection<RaftPeer> computeNewPeers(RaftPeer[] newMembers,
+ RaftConfiguration old) {
+ List<RaftPeer> peers = new ArrayList<>();
+ for (RaftPeer p : newMembers) {
+ if (!old.containsInConf(p.getId())) {
+ peers.add(p);
+ }
+ }
+ return peers;
+ }
+
+ RaftPeer getRandomPeer(String exclusiveId) {
+ final List<RaftPeer> peers = conf.getOtherPeers(exclusiveId);
+ if (peers.isEmpty()) {
+ return null;
+ }
+ final int index = ThreadLocalRandom.current().nextInt(peers.size());
+ return peers.get(index);
+ }
+
+ Collection<RaftPeer> getPeersInOldConf() {
+ return oldConf != null ? oldConf.getPeers() : Collections.emptyList();
+ }
+
+ Collection<RaftPeer> getPeersInConf() {
+ return conf.getPeers();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
new file mode 100644
index 0000000..caf9c4d
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.client.RaftClient;
+
+public interface RaftServerConstants {
+ long INVALID_LOG_INDEX = -1;
+ byte LOG_TERMINATE_BYTE = 0;
+ long DEFAULT_SEQNUM = RaftClient.DEFAULT_SEQNUM;
+
+ enum StartupOption {
+ FORMAT("format"),
+ REGULAR("regular");
+
+ private final String option;
+
+ StartupOption(String arg) {
+ this.option = arg;
+ }
+
+ public static StartupOption getOption(String arg) {
+ for (StartupOption s : StartupOption.values()) {
+ if (s.option.equals(arg)) {
+ return s;
+ }
+ }
+ return REGULAR;
+ }
+ }
+}