You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2019/03/28 10:24:28 UTC
[incubator-ratis] branch master updated: RATIS-503. RaftLog is
accessed by LeaderElection after closed.
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 66d0826 RATIS-503. RaftLog is accessed by LeaderElection after closed.
66d0826 is described below
commit 66d082600d494feb6e9e0b521bba47e2d7c77961
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Mar 28 18:24:02 2019 +0800
RATIS-503. RaftLog is accessed by LeaderElection after closed.
---
.../main/java/org/apache/ratis/util/Daemon.java | 9 +-
.../main/java/org/apache/ratis/util/LifeCycle.java | 49 ++++--
.../main/java/org/apache/ratis/util/LogUtils.java | 12 ++
.../java/org/apache/ratis/util/PeerProxyMap.java | 2 +-
.../java/org/apache/ratis/util/TimeDuration.java | 5 +
.../apache/ratis/server/impl/LeaderElection.java | 173 +++++++++++++--------
.../apache/ratis/server/impl/RaftServerImpl.java | 4 +-
.../org/apache/ratis/server/impl/RoleInfo.java | 2 +-
.../org/apache/ratis/server/impl/ServerState.java | 33 ++--
.../apache/ratis/server/protocol/TermIndex.java | 7 +-
.../server/simulation/SimulatedServerRpc.java | 23 ++-
11 files changed, 219 insertions(+), 100 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
index 1ef95ae..b311854 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -29,7 +29,12 @@ public class Daemon extends Thread {
/** Construct a daemon thread with the given runnable. */
public Daemon(Runnable runnable) {
+ this(runnable, runnable.toString());
+ }
+
+ /** Construct a daemon thread with the given runnable. */
+ public Daemon(Runnable runnable, String name) {
super(runnable);
- this.setName(runnable.toString());
+ this.setName(name);
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
index dd91c28..200a99e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
@@ -18,6 +18,7 @@
package org.apache.ratis.util;
import org.apache.ratis.util.function.CheckedRunnable;
+import org.apache.ratis.util.function.CheckedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,6 +80,16 @@ public class LifeCycle {
return false;
}
+ /** Is this {@link State#RUNNING}? */
+ public boolean isRunning() {
+ return this == RUNNING;
+ }
+
+ /** Is this {@link State#CLOSING} or {@link State#CLOSED}? */
+ public boolean isClosingOrClosed() {
+ return this == State.CLOSING || this == State.CLOSED;
+ }
+
static void put(State key, Map<State, List<State>> map, State... values) {
map.put(key, Collections.unmodifiableList(Arrays.asList(values)));
}
@@ -197,31 +208,47 @@ public class LifeCycle {
}
}
+ /**
+ * Check the current state and, if applicable, transit to {@link State#CLOSING}.
+ * If this is already in {@link State#CLOSING} or {@link State#CLOSED},
+ * then invoking this method has no effect.
+ * In other words, this method can be safely called multiple times.
+ */
+ public State checkStateAndClose() {
+ return checkStateAndClose(() -> State.CLOSING);
+ }
/**
* Check the current state and, if applicable, run the given close method.
- * This method can be called multiple times
+ * If this is already in {@link State#CLOSING} or {@link State#CLOSED},
+ * then invoking this method has no effect.
+ * In other words, this method can be safely called multiple times
* while the given close method will only be executed at most once.
*/
- public <T extends Throwable> void checkStateAndClose(
- CheckedRunnable<T> closeImpl) throws T {
+ public <T extends Throwable> State checkStateAndClose(CheckedRunnable<T> closeMethod) throws T {
+ return checkStateAndClose(() -> {
+ try {
+ closeMethod.run();
+ } finally {
+ transition(State.CLOSED);
+ }
+ return State.CLOSED;
+ });
+ }
+
+ private <T extends Throwable> State checkStateAndClose(CheckedSupplier<State, T> closeMethod) throws T {
if (compareAndTransition(State.NEW, State.CLOSED)) {
- return;
+ return State.CLOSED;
}
for(;;) {
final State c = getCurrentState();
if (c.isOneOf(State.CLOSING, State.CLOSED)) {
- return; //already closing or closed.
+ return c; //already closing or closed.
}
if (compareAndTransition(c, State.CLOSING)) {
- try {
- closeImpl.run();
- } finally {
- transition(State.CLOSED);
- }
- return;
+ return closeMethod.get();
}
// lifecycle state is changed, retry.
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
index d30bae1..92a03cc 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
@@ -137,4 +137,16 @@ public interface LogUtils {
}
}
}
+
+ static void infoOrTrace(Logger log, String message, Throwable t) {
+ infoOrTrace(log, () -> message, t);
+ }
+
+ static void infoOrTrace(Logger log, Supplier<String> message, Throwable t) {
+ if (log.isTraceEnabled()) {
+ log.trace(message.get(), t);
+ } else {
+ log.info("{}: {}", message.get(), t);
+ }
+ }
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index b2720ff..014dc30 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -54,7 +54,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements Closeable {
synchronized (this) {
if (proxy == null) {
final LifeCycle.State current = lifeCycle.getCurrentState();
- if (current.isOneOf(LifeCycle.State.CLOSING, LifeCycle.State.CLOSED)) {
+ if (current.isClosingOrClosed()) {
throw new AlreadyClosedException(name + " is already " + current);
}
lifeCycle.startAndTransition(
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index a5ebd55..b5205f3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -196,6 +196,11 @@ public final class TimeDuration implements Comparable<TimeDuration> {
return duration < 0;
}
+ /** @return Is this {@link TimeDuration} less than or equal to zero? */
+ public boolean isNonPositive() {
+ return duration <= 0;
+ }
+
/** Performs a {@link TimeUnit#sleep(long)} using this {@link TimeDuration}. */
public void sleep() throws InterruptedException {
unit.sleep(duration);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index fd02632..796f5e2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -17,15 +17,18 @@
*/
package org.apache.ratis.server.impl;
+import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
-import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,23 +37,28 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-class LeaderElection extends Daemon {
+class LeaderElection implements Runnable {
public static final Logger LOG = LoggerFactory.getLogger(LeaderElection.class);
private ResultAndTerm logAndReturn(Result result,
List<RequestVoteReplyProto> responses,
List<Exception> exceptions, long newTerm) {
- LOG.info(server.getId() + ": Election " + result + "; received "
- + responses.size() + " response(s) "
+ LOG.info(this + ": Election " + result + "; received " + responses.size() + " response(s) "
+ responses.stream().map(ProtoUtils::toString).collect(Collectors.toList())
+ " and " + exceptions.size() + " exception(s); " + server.getState());
int i = 0;
for(Exception e : exceptions) {
- LOG.info(" " + i++ + ": " + e);
- LOG.trace("TRACE", e);
+ final int j = i++;
+ LogUtils.infoOrTrace(LOG, () -> " Exception " + j, e);
}
return new ResultAndTerm(result, newTerm);
}
@@ -67,33 +75,52 @@ class LeaderElection extends Daemon {
}
}
+ static class Executor {
+ private final ExecutorCompletionService<RequestVoteReplyProto> service;
+ private final ExecutorService executor;
+
+ private final AtomicInteger count = new AtomicInteger();
+
+ Executor(Object name, int size) {
+ Preconditions.assertTrue(size > 0);
+ executor = Executors.newFixedThreadPool(size, r -> new Daemon(r, name + "-" + count.incrementAndGet()));
+ service = new ExecutorCompletionService<>(executor);
+ }
+
+ void shutdown() {
+ executor.shutdown();
+ }
+
+ void submit(Callable<RequestVoteReplyProto> task) {
+ service.submit(task);
+ }
+
+ Future<RequestVoteReplyProto> poll(TimeDuration waitTime) throws InterruptedException {
+ return service.poll(waitTime.getDuration(), waitTime.getUnit());
+ }
+ }
+
+ private static final AtomicInteger COUNT = new AtomicInteger();
+
+ private final String name;
+ private final LifeCycle lifeCycle;
+ private final Daemon daemon;
+
private final RaftServerImpl server;
- private ExecutorCompletionService<RequestVoteReplyProto> service;
- private ExecutorService executor;
- private volatile boolean running;
- /**
- * The Raft configuration should not change while the peer is in candidate
- * state. If the configuration changes, another peer should be acting as a
- * leader and this LeaderElection session should end.
- */
- private final RaftConfiguration conf;
- private final Collection<RaftPeer> others;
LeaderElection(RaftServerImpl server) {
+ this.name = server.getId() + ":" + server.getGroupId() + ":" + getClass().getSimpleName() + COUNT.incrementAndGet();
+ this.lifeCycle = new LifeCycle(this);
+ this.daemon = new Daemon(this);
this.server = server;
- conf = server.getRaftConf();
- others = conf.getOtherPeers(server.getId());
- this.running = true;
}
- void stopRunning() {
- this.running = false;
+ void start() {
+ lifeCycle.startAndTransition(daemon::start);
}
- private void initExecutor() {
- Preconditions.assertTrue(!others.isEmpty());
- executor = Executors.newFixedThreadPool(others.size(), Daemon::new);
- service = new ExecutorCompletionService<>(executor);
+ void shutdown() {
+ lifeCycle.checkStateAndClose();
}
@Override
@@ -103,29 +130,54 @@ class LeaderElection extends Daemon {
} catch (InterruptedException e) {
// the leader election thread is interrupted. The peer may already step
// down to a follower. The leader election should skip.
- LOG.info(server.getId() + " " + getClass().getSimpleName()
- + " thread is interrupted gracefully; server=" + server);
- } catch (IOException e) {
- LOG.warn("Failed to persist votedFor/term. Exit the leader election.", e);
- stopRunning();
+ LOG.info("{} thread is interrupted gracefully; server={}", this, server);
+ } catch(Throwable e) {
+ final LifeCycle.State state = lifeCycle.getCurrentState();
+ final String message = "Failed " + this + ", state=" + state;
+
+ if (state.isClosingOrClosed()) {
+ LogUtils.infoOrTrace(LOG, message, e);
+ LOG.info("{}: {} is safely ignored since this is already {}",
+ this, e.getClass().getSimpleName(), state);
+ } else {
+ if (!server.isAlive()) {
+ LogUtils.infoOrTrace(LOG, message, e);
+ LOG.info("{}: {} is safely ignored since the server is not alive: {}",
+ this, e.getClass().getSimpleName(), server);
+ } else {
+ LOG.error(message, e);
+ }
+ shutdown();
+ }
+ } finally {
+ lifeCycle.transition(LifeCycle.State.CLOSED);
}
}
+ private boolean shouldRun() {
+ return lifeCycle.getCurrentState().isRunning() && server.isCandidate() && server.isAlive();
+ }
+
+ private boolean shouldRun(long electionTerm) {
+ return shouldRun() && server.getState().getCurrentTerm() == electionTerm;
+ }
+
/**
* After a peer changes its role to candidate, it invokes this method to
* send out requestVote rpc to all other peers.
*/
private void askForVotes() throws InterruptedException, IOException {
final ServerState state = server.getState();
- while (running && server.isCandidate()) {
+ while (shouldRun()) {
// one round of requestVotes
final long electionTerm;
+ final RaftConfiguration conf;
synchronized (server) {
electionTerm = state.initElection();
- server.getState().persistMetadata();
+ conf = state.getRaftConf();
+ state.persistMetadata();
}
- LOG.info(state.getSelfId() + ": begin an election in Term "
- + electionTerm);
+ LOG.info("{}: begin an election at term {} for {}", this, electionTerm, conf);
TermIndex lastEntry = state.getLog().getLastEntryTermIndex();
if (lastEntry == null) {
@@ -137,24 +189,22 @@ class LeaderElection extends Daemon {
}
final ResultAndTerm r;
+ final Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
if (others.isEmpty()) {
r = new ResultAndTerm(Result.PASSED, electionTerm);
} else {
+ final Executor voteExecutor = new Executor(this, others.size());
try {
- initExecutor();
- int submitted = submitRequests(electionTerm, lastEntry);
- r = waitForResults(electionTerm, submitted);
+ final int submitted = submitRequests(electionTerm, lastEntry, others, voteExecutor);
+ r = waitForResults(electionTerm, submitted, conf, voteExecutor);
} finally {
- if (executor != null) {
- executor.shutdown();
- }
+ voteExecutor.shutdown();
}
}
synchronized (server) {
- if (electionTerm != state.getCurrentTerm() || !running ||
- !server.isCandidate()) {
- return; // term already passed or no longer a candidate.
+ if (!shouldRun(electionTerm)) {
+ return; // term already passed or this should not run anymore.
}
switch (r.result) {
@@ -162,14 +212,12 @@ class LeaderElection extends Daemon {
server.changeToLeader();
return;
case SHUTDOWN:
- LOG.info("{} received shutdown response when requesting votes.",
- server.getId());
+ LOG.info("{} received shutdown response when requesting votes.", this);
server.getProxy().close();
return;
case REJECTED:
case DISCOVERED_A_NEW_TERM:
- final long term = r.term > server.getState().getCurrentTerm() ?
- r.term : server.getState().getCurrentTerm();
+ final long term = Math.max(r.term, state.getCurrentTerm());
server.changeToFollowerAndPersistMetadata(term, Result.DISCOVERED_A_NEW_TERM);
return;
case TIMEOUT:
@@ -179,34 +227,33 @@ class LeaderElection extends Daemon {
}
}
- private int submitRequests(final long electionTerm, final TermIndex lastEntry) {
+ private int submitRequests(final long electionTerm, final TermIndex lastEntry,
+ Collection<RaftPeer> others, Executor voteExecutor) {
int submitted = 0;
for (final RaftPeer peer : others) {
final RequestVoteRequestProto r = server.createRequestVoteRequest(
peer.getId(), electionTerm, lastEntry);
- service.submit(
- () -> server.getServerRpc().requestVote(r));
+ voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
submitted++;
}
return submitted;
}
- private ResultAndTerm waitForResults(final long electionTerm,
- final int submitted) throws InterruptedException {
+ private ResultAndTerm waitForResults(final long electionTerm, final int submitted,
+ RaftConfiguration conf, Executor voteExecutor) throws InterruptedException {
final Timestamp timeout = Timestamp.currentTime().addTimeMs(server.getRandomTimeoutMs());
final List<RequestVoteReplyProto> responses = new ArrayList<>();
final List<Exception> exceptions = new ArrayList<>();
int waitForNum = submitted;
Collection<RaftPeerId> votedPeers = new ArrayList<>();
- while (waitForNum > 0 && running && server.isCandidate()) {
- final long waitTime = -timeout.elapsedTimeMs();
- if (waitTime <= 0) {
+ while (waitForNum > 0 && shouldRun(electionTerm)) {
+ final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n);
+ if (waitTime.isNonPositive()) {
return logAndReturn(Result.TIMEOUT, responses, exceptions, -1);
}
try {
- final Future<RequestVoteReplyProto> future = service.poll(
- waitTime, TimeUnit.MILLISECONDS);
+ final Future<RequestVoteReplyProto> future = voteExecutor.poll(waitTime);
if (future == null) {
continue; // poll timeout, continue to return Result.TIMEOUT
}
@@ -227,8 +274,7 @@ class LeaderElection extends Daemon {
}
}
} catch(ExecutionException e) {
- LOG.info("{} got exception when requesting votes: {}", server.getId(), e);
- LOG.trace("TRACE", e);
+ LogUtils.infoOrTrace(LOG, () -> this + " got exception when requesting votes", e);
exceptions.add(e);
}
waitForNum--;
@@ -236,4 +282,9 @@ class LeaderElection extends Daemon {
// received all the responses
return logAndReturn(Result.REJECTED, responses, exceptions, -1);
}
+
+ @Override
+ public String toString() {
+ return name;
+ }
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index fe26d45..fb57b6c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -52,8 +52,6 @@ import java.util.stream.Collectors;
import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY;
import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER;
import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
-import static org.apache.ratis.util.LifeCycle.State.CLOSED;
-import static org.apache.ratis.util.LifeCycle.State.CLOSING;
import static org.apache.ratis.util.LifeCycle.State.NEW;
import static org.apache.ratis.util.LifeCycle.State.RUNNING;
import static org.apache.ratis.util.LifeCycle.State.STARTING;
@@ -269,7 +267,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
}
public boolean isAlive() {
- return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED);
+ return !lifeCycle.getCurrentState().isClosingOrClosed();
}
public boolean isFollower() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index 52f4b81..a27a086 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -132,7 +132,7 @@ class RoleInfo {
final LeaderElection election = leaderElection.getAndSet(null);
if (election != null) {
LOG.info("{}: shutdown {}", id, election.getClass().getSimpleName());
- election.stopRunning();
+ election.shutdown();
// no need to interrupt the election thread
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index ee4ede9..0a343b2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -41,6 +41,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import static org.apache.ratis.server.impl.RaftServerImpl.LOG;
@@ -65,10 +66,12 @@ public class ServerState implements Closeable {
private final long leaderElectionTimeoutMs;
/**
- * Latest term server has seen. initialized to 0 on first boot, increases
- * monotonically.
+ * Latest term server has seen.
+ * Initialized to 0 on first boot, increases monotonically.
+ *
+ * @see TermIndex#isValidTerm(int)
*/
- private volatile long currentTerm;
+ private final AtomicLong currentTerm = new AtomicLong();
/**
* The server ID of the leader for this term. Null means either there is
* no leader for this term yet or this server does not know who it is yet.
@@ -116,7 +119,7 @@ public class ServerState implements Closeable {
log = initLog(id, prop, lastApplied, this::setRaftConf);
RaftLog.Metadata metadata = log.loadMetadata();
- currentTerm = metadata.getTerm();
+ currentTerm.set(metadata.getTerm());
votedFor = metadata.getVotedFor();
stateMachineUpdater = new StateMachineUpdater(stateMachine, server, log,
@@ -203,12 +206,12 @@ public class ServerState implements Closeable {
}
public long getCurrentTerm() {
- return currentTerm;
+ return currentTerm.get();
}
boolean updateCurrentTerm(long newTerm) {
- if (newTerm > currentTerm) {
- currentTerm = newTerm;
+ final long current = currentTerm.getAndUpdate(curTerm -> Math.max(curTerm, newTerm));
+ if (newTerm > current) {
votedFor = null;
setLeader(null, "updateCurrentTerm");
return true;
@@ -230,11 +233,11 @@ public class ServerState implements Closeable {
long initElection() {
votedFor = selfId;
setLeader(null, "initElection");
- return ++currentTerm;
+ return currentTerm.incrementAndGet();
}
void persistMetadata() throws IOException {
- this.log.writeMetadata(currentTerm, votedFor);
+ this.log.writeMetadata(currentTerm.get(), votedFor);
}
/**
@@ -281,7 +284,7 @@ public class ServerState implements Closeable {
}
void appendLog(TransactionContext operation) throws StateMachineException {
- log.append(currentTerm, operation);
+ log.append(currentTerm.get(), operation);
Objects.requireNonNull(operation.getLogEntry());
}
@@ -291,9 +294,10 @@ public class ServerState implements Closeable {
* @return true if the check passes
*/
boolean recognizeLeader(RaftPeerId leaderId, long leaderTerm) {
- if (leaderTerm < currentTerm) {
+ final long current = currentTerm.get();
+ if (leaderTerm < current) {
return false;
- } else if (leaderTerm > currentTerm || this.leaderId == null) {
+ } else if (leaderTerm > current || this.leaderId == null) {
// If the request indicates a term that is greater than the current term
// or no leader has been set for the current term, make sure to update
// leader and term later
@@ -309,9 +313,10 @@ public class ServerState implements Closeable {
if (!getRaftConf().containsInConf(candidateId)) {
return false;
}
- if (candidateTerm > currentTerm) {
+ final long current = currentTerm.get();
+ if (candidateTerm > current) {
return true;
- } else if (candidateTerm == currentTerm) {
+ } else if (candidateTerm == current) {
// has not voted yet or this is a retry
return votedFor == null || votedFor.equals(candidateId);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java b/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
index fed14a3..ab9a030 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -31,6 +31,11 @@ public interface TermIndex extends Comparable<TermIndex> {
/** @return the index. */
long getIndex();
+ /** A term number is valid iff it is greater than zero. */
+ static boolean isValidTerm(int term) {
+ return term > 0;
+ }
+
/** Create a new {@link TermIndex} instance. */
static TermIndex newTermIndex(long term, long index) {
return ServerImplUtils.newTermIndex(term, index);
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index e9df14e..0a4243e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,15 +17,26 @@
*/
package org.apache.ratis.server.simulation;
-import org.apache.ratis.protocol.*;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.protocol.GroupInfoRequest;
+import org.apache.ratis.protocol.GroupListRequest;
+import org.apache.ratis.protocol.GroupManagementRequest;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.impl.RaftServerProxy;
-import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -126,7 +137,7 @@ class SimulatedServerRpc implements RaftServerRpc {
= new RequestHandler.HandlerInterface<RaftServerRequest, RaftServerReply>() {
@Override
public boolean isAlive() {
- return !server.getLifeCycleState().isOneOf(LifeCycle.State.CLOSING, LifeCycle.State.CLOSED);
+ return !server.getLifeCycleState().isClosingOrClosed();
}
@Override
@@ -148,7 +159,7 @@ class SimulatedServerRpc implements RaftServerRpc {
= new RequestHandler.HandlerInterface<RaftClientRequest, RaftClientReply>() {
@Override
public boolean isAlive() {
- return !server.getLifeCycleState().isOneOf(LifeCycle.State.CLOSING, LifeCycle.State.CLOSED);
+ return !server.getLifeCycleState().isClosingOrClosed();
}
@Override