You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/04 14:52:12 UTC
[incubator-ratis] branch master updated: RATIS-1112. Ensure a node
doesn't get reelected as a leader if it voluntarily steps down. (#252)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9357fdb RATIS-1112. Ensure a node doesn't get reelected as a leader if it voluntarily steps down. (#252)
9357fdb is described below
commit 9357fdb8a8344b0bbda3f38291380f18f6be00d0
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Nov 4 22:52:03 2020 +0800
RATIS-1112. Ensure a node doesn't get reelected as a leader if it voluntarily steps down. (#252)
---
.../apache/ratis/server/RaftServerConfigKeys.java | 20 +++++++++--
.../apache/ratis/server/impl/FollowerState.java | 26 ++++++++++++--
.../org/apache/ratis/server/impl/LeaderState.java | 40 +++++++++++++---------
.../org/apache/ratis/server/impl/LogAppender.java | 2 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 14 +++++---
.../org/apache/ratis/server/impl/RoleInfo.java | 4 +--
.../ratis/server/impl/LeaderElectionTests.java | 21 ++++++++++++
7 files changed, 97 insertions(+), 30 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 8d5c312..5abb03f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -466,7 +466,7 @@ public interface RaftServerConfigKeys {
setTimeDuration(properties::setTimeDuration, SLEEP_TIME_KEY, sleepTime);
}
- String SLOWNESS_TIMEOUT_KEY = PREFIX + "slowness.timeout";
+ String SLOWNESS_TIMEOUT_KEY = PREFIX + ".slowness.timeout";
TimeDuration SLOWNESS_TIMEOUT_DEFAULT = TimeDuration.valueOf(60, TimeUnit.SECONDS);
static TimeDuration slownessTimeout(RaftProperties properties) {
return getTimeDuration(properties.getTimeDuration(SLOWNESS_TIMEOUT_DEFAULT.getUnit()),
@@ -502,8 +502,22 @@ public interface RaftServerConfigKeys {
return getTimeDuration(properties.getTimeDuration(NO_LEADER_TIMEOUT_DEFAULT.getUnit()),
NO_LEADER_TIMEOUT_KEY, NO_LEADER_TIMEOUT_DEFAULT, getDefaultLog());
}
- static void setNoLeaderTimeout(RaftProperties properties, TimeDuration leaderElectionTimeout) {
- setTimeDuration(properties::setTimeDuration, NO_LEADER_TIMEOUT_KEY, leaderElectionTimeout);
+ static void setNoLeaderTimeout(RaftProperties properties, TimeDuration noLeaderTimeout) {
+ setTimeDuration(properties::setTimeDuration, NO_LEADER_TIMEOUT_KEY, noLeaderTimeout);
+ }
+ }
+
+ interface LeaderElection {
+ String PREFIX = RaftServerConfigKeys.PREFIX + "." + LeaderElection.class.getSimpleName().toLowerCase();
+
+ String LEADER_STEP_DOWN_WAIT_TIME_KEY = PREFIX + ".leader.step-down.wait-time";
+ TimeDuration LEADER_STEP_DOWN_WAIT_TIME_DEFAULT = TimeDuration.valueOf(10, TimeUnit.SECONDS);
+ static TimeDuration leaderStepDownWaitTime(RaftProperties properties) {
+ return getTimeDuration(properties.getTimeDuration(LEADER_STEP_DOWN_WAIT_TIME_DEFAULT.getUnit()),
+ LEADER_STEP_DOWN_WAIT_TIME_KEY, LEADER_STEP_DOWN_WAIT_TIME_DEFAULT, getDefaultLog());
+ }
+ static void setLeaderStepDownWaitTime(RaftProperties properties, TimeDuration noLeaderTimeout) {
+ setTimeDuration(properties::setTimeDuration, LEADER_STEP_DOWN_WAIT_TIME_KEY, noLeaderTimeout);
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index 13ef103..b1d0d93 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -19,10 +19,12 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.ToIntFunction;
@@ -52,15 +54,18 @@ class FollowerState extends Daemon {
static final Logger LOG = LoggerFactory.getLogger(FollowerState.class);
private final String name;
+ private final Object reason;
private final RaftServerImpl server;
- private volatile Timestamp lastRpcTime = Timestamp.currentTime();
+ private final Timestamp creationTime = Timestamp.currentTime();
+ private volatile Timestamp lastRpcTime = creationTime;
private volatile boolean isRunning = true;
private final AtomicInteger outstandingOp = new AtomicInteger();
- FollowerState(RaftServerImpl server) {
+ FollowerState(RaftServerImpl server, Object reason) {
this.name = server.getMemberId() + "-" + getClass().getSimpleName();
this.server = server;
+ this.reason = reason;
}
void updateLastRpcTime(UpdateType type) {
@@ -88,6 +93,20 @@ class FollowerState extends Daemon {
this.isRunning = false;
}
+ boolean lostMajorityHeartbeatsRecently() {
+ if (reason != LeaderState.StepDownReason.LOST_MAJORITY_HEARTBEATS) {
+ return false;
+ }
+ final TimeDuration elapsed = creationTime.elapsedTime();
+ final TimeDuration waitTime = server.getLeaderStepDownWaitTime();
+ if (elapsed.compareTo(waitTime) >= 0) {
+ return false;
+ }
+ LOG.info("{}: Skipping leader election since it stepped down recently (elapsed = {} < waitTime = {})",
+ this, elapsed.to(TimeUnit.MILLISECONDS), waitTime);
+ return true;
+ }
+
@Override
public void run() {
long sleepDeviationThresholdMs = server.getSleepDeviationThresholdMs();
@@ -104,7 +123,8 @@ class FollowerState extends Daemon {
break;
}
synchronized (server) {
- if (outstandingOp.get() == 0 && lastRpcTime.elapsedTimeMs() >= electionTimeout) {
+ if (outstandingOp.get() == 0 && lastRpcTime.elapsedTimeMs() >= electionTimeout
+ && !lostMajorityHeartbeatsRecently()) {
LOG.info("{}: change to CANDIDATE, lastRpcTime:{}ms, electionTimeout:{}ms",
this, lastRpcTime.elapsedTimeMs(), electionTimeout);
server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters.
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 240be2b..58fbc08 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -68,6 +68,15 @@ public class LeaderState {
NOPROGRESS, PROGRESSING, CAUGHTUP
}
+ enum StepDownReason {
+ HIGHER_TERM, HIGHER_PRIORITY, LOST_MAJORITY_HEARTBEATS, STATE_MACHINE_EXCEPTION;
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":" + name();
+ }
+ }
+
static class StateUpdateEvent {
private enum Type {
STEP_DOWN, UPDATE_COMMIT, CHECK_STAGING
@@ -468,17 +477,17 @@ public class LeaderState {
stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollower().getPeer().getId()));
}
- void submitStepDownEvent() {
- submitStepDownEvent(getCurrentTerm());
+ void submitStepDownEvent(StepDownReason reason) {
+ submitStepDownEvent(getCurrentTerm(), reason);
}
- void submitStepDownEvent(long term) {
- eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN, term, () -> stepDown(term)));
+ void submitStepDownEvent(long term, StepDownReason reason) {
+ eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN, term, () -> stepDown(term, reason)));
}
- private void stepDown(long term) {
+ private void stepDown(long term, StepDownReason reason) {
try {
- server.changeToFollowerAndPersistMetadata(term, "stepDown");
+ server.changeToFollowerAndPersistMetadata(term, reason);
} catch(IOException e) {
final String s = this + ": Failed to persist metadata for term " + term;
LOG.warn(s, e);
@@ -490,7 +499,7 @@ public class LeaderState {
}
}
- private synchronized void stepDown(long term, TermIndex lastEntry) {
+ private synchronized void yieldLeaderToHigherPriorityPeer(long term, TermIndex lastEntry) {
ServerState state = server.getState();
TermIndex currLastEntry = state.getLastEntry();
if (ServerState.compareLog(currLastEntry, lastEntry) != 0) {
@@ -499,7 +508,7 @@ public class LeaderState {
return;
}
- stepDown(term);
+ stepDown(term, StepDownReason.HIGHER_PRIORITY);
}
private void prepare() {
@@ -820,8 +829,6 @@ public class LeaderState {
final RaftConfiguration conf = server.getRaftConf();
int leaderPriority = conf.getPeer(server.getId()).getPriority();
- TermIndex leaderLastEntry = server.getState().getLastEntry();
-
for (LogAppender logAppender : senders.getSenders()) {
FollowerInfo followerInfo = logAppender.getFollower();
RaftPeerId followerID = followerInfo.getPeer().getId();
@@ -831,13 +838,14 @@ public class LeaderState {
continue;
}
+ final TermIndex leaderLastEntry = server.getState().getLastEntry();
if (leaderLastEntry == null) {
LOG.info("{} stepDown leadership on term:{} because follower's priority:{} is higher than leader's:{} " +
"and leader's lastEntry is null",
this, currentTerm, followerPriority, leaderPriority);
// step down as follower
- stepDown(currentTerm, server.getState().getLastEntry());
+ yieldLeaderToHigherPriorityPeer(currentTerm, leaderLastEntry);
return;
}
@@ -848,7 +856,7 @@ public class LeaderState {
leaderLastEntry.getIndex());
// step down as follower
- stepDown(currentTerm, server.getState().getLastEntry());
+ yieldLeaderToHigherPriorityPeer(currentTerm, leaderLastEntry);
return;
}
}
@@ -887,16 +895,14 @@ public class LeaderState {
return;
}
- List<FollowerInfo> followers = senders.stream()
- .map(LogAppender::getFollower).collect(Collectors.toList());
-
LOG.warn(this + ": Lost leadership on term: " + currentTerm
+ ". Election timeout: " + server.getMaxTimeoutMs() + "ms"
+ ". In charge for: " + server.getRole().getRoleElapsedTimeMs() + "ms"
- + ". Conf: " + conf + ". Followers: " + followers);
+ + ". Conf: " + conf);
+ senders.stream().map(LogAppender::getFollower).forEach(f -> LOG.warn("Follower {}", f));
// step down as follower
- stepDown(currentTerm);
+ stepDown(currentTerm, StepDownReason.LOST_MAJORITY_HEARTBEATS);
}
void replyPendingRequest(long logIndex, RaftClientReply reply) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 03fd67d..1fa5919 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -602,7 +602,7 @@ public class LogAppender {
synchronized (server) {
if (isAppenderRunning() && follower.isAttendingVote()
&& responseTerm > leaderState.getCurrentTerm()) {
- leaderState.submitStepDownEvent(responseTerm);
+ leaderState.submitStepDownEvent(responseTerm, LeaderState.StepDownReason.HIGHER_TERM);
return true;
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index c30f017..7665bd2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -89,6 +89,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
private final StateMachine stateMachine;
private final int minTimeoutMs;
private final int maxTimeoutMs;
+ private final TimeDuration leaderStepDownWaitTime;
private final int rpcSlownessTimeoutMs;
private final int sleepDeviationThresholdMs;
private final boolean installSnapshotEnabled;
@@ -126,6 +127,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
minTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties).toIntExact(TimeUnit.MILLISECONDS);
maxTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMax(properties).toIntExact(TimeUnit.MILLISECONDS);
rpcSlownessTimeoutMs = RaftServerConfigKeys.Rpc.slownessTimeout(properties).toIntExact(TimeUnit.MILLISECONDS);
+ leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
sleepDeviationThresholdMs = RaftServerConfigKeys.sleepDeviationThreshold(properties);
installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs,
@@ -173,6 +175,10 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
return minTimeoutMs + ThreadLocalRandom.current().nextInt(maxTimeoutMs - minTimeoutMs + 1);
}
+ TimeDuration getLeaderStepDownWaitTime() {
+ return leaderStepDownWaitTime;
+ }
+
int getSleepDeviationThresholdMs() {
return sleepDeviationThresholdMs;
}
@@ -233,7 +239,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
*/
private void startAsFollower() {
setRole(RaftPeerRole.FOLLOWER, "startAsFollower");
- role.startFollowerState(this);
+ role.startFollowerState(this, "startAsFollower");
lifeCycle.transition(RUNNING);
}
@@ -401,7 +407,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
} else if (old == RaftPeerRole.FOLLOWER) {
role.shutdownFollowerState();
}
- role.startFollowerState(this);
+ role.startFollowerState(this, reason);
}
return metadataUpdated;
}
@@ -609,7 +615,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
cacheEntry.failWithReply(exceptionReply);
// leader will step down here
if (isLeader()) {
- leaderState.submitStepDownEvent();
+ leaderState.submitStepDownEvent(LeaderState.StepDownReason.STATE_MACHINE_EXCEPTION);
}
return CompletableFuture.completedFuture(exceptionReply);
}
@@ -1091,7 +1097,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
state.setLeader(leaderId, "appendEntries");
if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) {
- role.startFollowerState(this);
+ role.startFollowerState(this, Op.APPEND_ENTRIES);
}
followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index ae2a0df..74be175 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -111,8 +111,8 @@ class RoleInfo {
return Optional.ofNullable(followerState.get());
}
- void startFollowerState(RaftServerImpl server) {
- updateAndGet(followerState, new FollowerState(server)).start();
+ void startFollowerState(RaftServerImpl server, Object reason) {
+ updateAndGet(followerState, new FollowerState(server, reason)).start();
}
void shutdownFollowerState() {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index b4e718a..b76b057 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -97,6 +97,27 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
}
@Test
+ public void testLostMajorityHeartbeats() throws Exception {
+ runWithNewCluster(3, this::runTestLostMajorityHeartbeats);
+ }
+
+ void runTestLostMajorityHeartbeats(CLUSTER cluster) throws Exception {
+ final RaftServerImpl leader = waitForLeader(cluster);
+ try {
+ isolate(cluster, leader.getId());
+ Thread.sleep(leader.getMaxTimeoutMs());
+ Thread.sleep(leader.getMaxTimeoutMs());
+ final Optional<FollowerState> optional = leader.getRole().getFollowerState();
+ Assert.assertTrue(optional.isPresent());
+ final FollowerState followerState = optional.get();
+ Assert.assertTrue(followerState.lostMajorityHeartbeatsRecently());
+ } finally {
+ deIsolate(cluster, leader.getId());
+ }
+ }
+
+
+ @Test
public void testEnforceLeader() throws Exception {
LOG.info("Running testEnforceLeader");
final int numServer = 5;