You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/04 14:52:12 UTC

[incubator-ratis] branch master updated: RATIS-1112. Ensure a node doesn't get reelected as a leader if it voluntarily steps down. (#252)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9357fdb  RATIS-1112. Ensure a node doesn't get reelected as a leader if it voluntarily steps down. (#252)
9357fdb is described below

commit 9357fdb8a8344b0bbda3f38291380f18f6be00d0
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Nov 4 22:52:03 2020 +0800

    RATIS-1112. Ensure a node doesn't get reelected as a leader if it voluntarily steps down. (#252)
---
 .../apache/ratis/server/RaftServerConfigKeys.java  | 20 +++++++++--
 .../apache/ratis/server/impl/FollowerState.java    | 26 ++++++++++++--
 .../org/apache/ratis/server/impl/LeaderState.java  | 40 +++++++++++++---------
 .../org/apache/ratis/server/impl/LogAppender.java  |  2 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   | 14 +++++---
 .../org/apache/ratis/server/impl/RoleInfo.java     |  4 +--
 .../ratis/server/impl/LeaderElectionTests.java     | 21 ++++++++++++
 7 files changed, 97 insertions(+), 30 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 8d5c312..5abb03f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -466,7 +466,7 @@ public interface RaftServerConfigKeys {
       setTimeDuration(properties::setTimeDuration, SLEEP_TIME_KEY, sleepTime);
     }
 
-    String SLOWNESS_TIMEOUT_KEY = PREFIX + "slowness.timeout";
+    String SLOWNESS_TIMEOUT_KEY = PREFIX + ".slowness.timeout";
     TimeDuration SLOWNESS_TIMEOUT_DEFAULT = TimeDuration.valueOf(60, TimeUnit.SECONDS);
     static TimeDuration slownessTimeout(RaftProperties properties) {
       return getTimeDuration(properties.getTimeDuration(SLOWNESS_TIMEOUT_DEFAULT.getUnit()),
@@ -502,8 +502,22 @@ public interface RaftServerConfigKeys {
       return getTimeDuration(properties.getTimeDuration(NO_LEADER_TIMEOUT_DEFAULT.getUnit()),
           NO_LEADER_TIMEOUT_KEY, NO_LEADER_TIMEOUT_DEFAULT, getDefaultLog());
     }
-    static void setNoLeaderTimeout(RaftProperties properties, TimeDuration leaderElectionTimeout) {
-      setTimeDuration(properties::setTimeDuration, NO_LEADER_TIMEOUT_KEY, leaderElectionTimeout);
+    static void setNoLeaderTimeout(RaftProperties properties, TimeDuration noLeaderTimeout) {
+      setTimeDuration(properties::setTimeDuration, NO_LEADER_TIMEOUT_KEY, noLeaderTimeout);
+    }
+  }
+
+  interface LeaderElection {
+    String PREFIX = RaftServerConfigKeys.PREFIX + "." + LeaderElection.class.getSimpleName().toLowerCase();
+
+    String LEADER_STEP_DOWN_WAIT_TIME_KEY = PREFIX + ".leader.step-down.wait-time";
+    TimeDuration LEADER_STEP_DOWN_WAIT_TIME_DEFAULT = TimeDuration.valueOf(10, TimeUnit.SECONDS);
+    static TimeDuration leaderStepDownWaitTime(RaftProperties properties) {
+      return getTimeDuration(properties.getTimeDuration(LEADER_STEP_DOWN_WAIT_TIME_DEFAULT.getUnit()),
+          LEADER_STEP_DOWN_WAIT_TIME_KEY, LEADER_STEP_DOWN_WAIT_TIME_DEFAULT, getDefaultLog());
+    }
+    static void setLeaderStepDownWaitTime(RaftProperties properties, TimeDuration noLeaderTimeout) {
+      setTimeDuration(properties::setTimeDuration, LEADER_STEP_DOWN_WAIT_TIME_KEY, noLeaderTimeout);
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index 13ef103..b1d0d93 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -19,10 +19,12 @@ package org.apache.ratis.server.impl;
 
 import org.apache.ratis.util.Daemon;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.Timestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.ToIntFunction;
 
@@ -52,15 +54,18 @@ class FollowerState extends Daemon {
   static final Logger LOG = LoggerFactory.getLogger(FollowerState.class);
 
   private final String name;
+  private final Object reason;
   private final RaftServerImpl server;
 
-  private volatile Timestamp lastRpcTime = Timestamp.currentTime();
+  private final Timestamp creationTime = Timestamp.currentTime();
+  private volatile Timestamp lastRpcTime = creationTime;
   private volatile boolean isRunning = true;
   private final AtomicInteger outstandingOp = new AtomicInteger();
 
-  FollowerState(RaftServerImpl server) {
+  FollowerState(RaftServerImpl server, Object reason) {
     this.name = server.getMemberId() + "-" + getClass().getSimpleName();
     this.server = server;
+    this.reason = reason;
   }
 
   void updateLastRpcTime(UpdateType type) {
@@ -88,6 +93,20 @@ class FollowerState extends Daemon {
     this.isRunning = false;
   }
 
+  boolean lostMajorityHeartbeatsRecently() {
+    if (reason != LeaderState.StepDownReason.LOST_MAJORITY_HEARTBEATS) {
+      return false;
+    }
+    final TimeDuration elapsed = creationTime.elapsedTime();
+    final TimeDuration waitTime = server.getLeaderStepDownWaitTime();
+    if (elapsed.compareTo(waitTime) >= 0) {
+      return false;
+    }
+    LOG.info("{}: Skipping leader election since it stepped down recently (elapsed = {} < waitTime = {})",
+        this, elapsed.to(TimeUnit.MILLISECONDS), waitTime);
+    return true;
+  }
+
   @Override
   public  void run() {
     long sleepDeviationThresholdMs = server.getSleepDeviationThresholdMs();
@@ -104,7 +123,8 @@ class FollowerState extends Daemon {
           break;
         }
         synchronized (server) {
-          if (outstandingOp.get() == 0 && lastRpcTime.elapsedTimeMs() >= electionTimeout) {
+          if (outstandingOp.get() == 0 && lastRpcTime.elapsedTimeMs() >= electionTimeout
+              && !lostMajorityHeartbeatsRecently()) {
             LOG.info("{}: change to CANDIDATE, lastRpcTime:{}ms, electionTimeout:{}ms",
                 this, lastRpcTime.elapsedTimeMs(), electionTimeout);
             server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters.
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 240be2b..58fbc08 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -68,6 +68,15 @@ public class LeaderState {
     NOPROGRESS, PROGRESSING, CAUGHTUP
   }
 
+  enum StepDownReason {
+    HIGHER_TERM, HIGHER_PRIORITY, LOST_MAJORITY_HEARTBEATS, STATE_MACHINE_EXCEPTION;
+
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + ":" + name();
+    }
+  }
+
   static class StateUpdateEvent {
     private enum Type {
       STEP_DOWN, UPDATE_COMMIT, CHECK_STAGING
@@ -468,17 +477,17 @@ public class LeaderState {
     stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollower().getPeer().getId()));
   }
 
-  void submitStepDownEvent() {
-    submitStepDownEvent(getCurrentTerm());
+  void submitStepDownEvent(StepDownReason reason) {
+    submitStepDownEvent(getCurrentTerm(), reason);
   }
 
-  void submitStepDownEvent(long term) {
-    eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN, term, () -> stepDown(term)));
+  void submitStepDownEvent(long term, StepDownReason reason) {
+    eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN, term, () -> stepDown(term, reason)));
   }
 
-  private void stepDown(long term) {
+  private void stepDown(long term, StepDownReason reason) {
     try {
-      server.changeToFollowerAndPersistMetadata(term, "stepDown");
+      server.changeToFollowerAndPersistMetadata(term, reason);
     } catch(IOException e) {
       final String s = this + ": Failed to persist metadata for term " + term;
       LOG.warn(s, e);
@@ -490,7 +499,7 @@ public class LeaderState {
     }
   }
 
-  private synchronized void stepDown(long term, TermIndex lastEntry) {
+  private synchronized void yieldLeaderToHigherPriorityPeer(long term, TermIndex lastEntry) {
     ServerState state = server.getState();
     TermIndex currLastEntry = state.getLastEntry();
     if (ServerState.compareLog(currLastEntry, lastEntry) != 0) {
@@ -499,7 +508,7 @@ public class LeaderState {
       return;
     }
 
-    stepDown(term);
+    stepDown(term, StepDownReason.HIGHER_PRIORITY);
   }
 
   private void prepare() {
@@ -820,8 +829,6 @@ public class LeaderState {
     final RaftConfiguration conf = server.getRaftConf();
     int leaderPriority = conf.getPeer(server.getId()).getPriority();
 
-    TermIndex leaderLastEntry = server.getState().getLastEntry();
-
     for (LogAppender logAppender : senders.getSenders()) {
       FollowerInfo followerInfo = logAppender.getFollower();
       RaftPeerId followerID = followerInfo.getPeer().getId();
@@ -831,13 +838,14 @@ public class LeaderState {
         continue;
       }
 
+      final TermIndex leaderLastEntry = server.getState().getLastEntry();
       if (leaderLastEntry == null) {
         LOG.info("{} stepDown leadership on term:{} because follower's priority:{} is higher than leader's:{} " +
                 "and leader's lastEntry is null",
             this, currentTerm, followerPriority, leaderPriority);
 
         // step down as follower
-        stepDown(currentTerm, server.getState().getLastEntry());
+        yieldLeaderToHigherPriorityPeer(currentTerm, leaderLastEntry);
         return;
       }
 
@@ -848,7 +856,7 @@ public class LeaderState {
             leaderLastEntry.getIndex());
 
         // step down as follower
-        stepDown(currentTerm, server.getState().getLastEntry());
+        yieldLeaderToHigherPriorityPeer(currentTerm, leaderLastEntry);
         return;
       }
     }
@@ -887,16 +895,14 @@ public class LeaderState {
       return;
     }
 
-    List<FollowerInfo> followers = senders.stream()
-        .map(LogAppender::getFollower).collect(Collectors.toList());
-
     LOG.warn(this + ": Lost leadership on term: " + currentTerm
         + ". Election timeout: " + server.getMaxTimeoutMs() + "ms"
         + ". In charge for: " + server.getRole().getRoleElapsedTimeMs() + "ms"
-        + ". Conf: " + conf + ". Followers: " + followers);
+        + ". Conf: " + conf);
+    senders.stream().map(LogAppender::getFollower).forEach(f -> LOG.warn("Follower {}", f));
 
     // step down as follower
-    stepDown(currentTerm);
+    stepDown(currentTerm, StepDownReason.LOST_MAJORITY_HEARTBEATS);
   }
 
   void replyPendingRequest(long logIndex, RaftClientReply reply) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 03fd67d..1fa5919 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -602,7 +602,7 @@ public class LogAppender {
     synchronized (server) {
       if (isAppenderRunning() && follower.isAttendingVote()
           && responseTerm > leaderState.getCurrentTerm()) {
-        leaderState.submitStepDownEvent(responseTerm);
+        leaderState.submitStepDownEvent(responseTerm, LeaderState.StepDownReason.HIGHER_TERM);
         return true;
       }
     }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index c30f017..7665bd2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -89,6 +89,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
   private final StateMachine stateMachine;
   private final int minTimeoutMs;
   private final int maxTimeoutMs;
+  private final TimeDuration leaderStepDownWaitTime;
   private final int rpcSlownessTimeoutMs;
   private final int sleepDeviationThresholdMs;
   private final boolean installSnapshotEnabled;
@@ -126,6 +127,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     minTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties).toIntExact(TimeUnit.MILLISECONDS);
     maxTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMax(properties).toIntExact(TimeUnit.MILLISECONDS);
     rpcSlownessTimeoutMs = RaftServerConfigKeys.Rpc.slownessTimeout(properties).toIntExact(TimeUnit.MILLISECONDS);
+    leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
     sleepDeviationThresholdMs = RaftServerConfigKeys.sleepDeviationThreshold(properties);
     installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
     Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs,
@@ -173,6 +175,10 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     return minTimeoutMs + ThreadLocalRandom.current().nextInt(maxTimeoutMs - minTimeoutMs + 1);
   }
 
+  TimeDuration getLeaderStepDownWaitTime() {
+    return leaderStepDownWaitTime;
+  }
+
   int getSleepDeviationThresholdMs() {
     return sleepDeviationThresholdMs;
   }
@@ -233,7 +239,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
    */
   private void startAsFollower() {
     setRole(RaftPeerRole.FOLLOWER, "startAsFollower");
-    role.startFollowerState(this);
+    role.startFollowerState(this, "startAsFollower");
     lifeCycle.transition(RUNNING);
   }
 
@@ -401,7 +407,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
       } else if (old == RaftPeerRole.FOLLOWER) {
         role.shutdownFollowerState();
       }
-      role.startFollowerState(this);
+      role.startFollowerState(this, reason);
     }
     return metadataUpdated;
   }
@@ -609,7 +615,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
         cacheEntry.failWithReply(exceptionReply);
         // leader will step down here
         if (isLeader()) {
-          leaderState.submitStepDownEvent();
+          leaderState.submitStepDownEvent(LeaderState.StepDownReason.STATE_MACHINE_EXCEPTION);
         }
         return CompletableFuture.completedFuture(exceptionReply);
       }
@@ -1091,7 +1097,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
       state.setLeader(leaderId, "appendEntries");
 
       if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) {
-        role.startFollowerState(this);
+        role.startFollowerState(this, Op.APPEND_ENTRIES);
       }
       followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START);
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index ae2a0df..74be175 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -111,8 +111,8 @@ class RoleInfo {
     return Optional.ofNullable(followerState.get());
   }
 
-  void startFollowerState(RaftServerImpl server) {
-    updateAndGet(followerState, new FollowerState(server)).start();
+  void startFollowerState(RaftServerImpl server, Object reason) {
+    updateAndGet(followerState, new FollowerState(server, reason)).start();
   }
 
   void shutdownFollowerState() {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index b4e718a..b76b057 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -97,6 +97,27 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
   }
 
   @Test
+  public void testLostMajorityHeartbeats() throws Exception {
+    runWithNewCluster(3, this::runTestLostMajorityHeartbeats);
+  }
+
+  void runTestLostMajorityHeartbeats(CLUSTER cluster) throws Exception {
+    final RaftServerImpl leader = waitForLeader(cluster);
+    try {
+      isolate(cluster, leader.getId());
+      Thread.sleep(leader.getMaxTimeoutMs());
+      Thread.sleep(leader.getMaxTimeoutMs());
+      final Optional<FollowerState> optional = leader.getRole().getFollowerState();
+      Assert.assertTrue(optional.isPresent());
+      final FollowerState followerState = optional.get();
+      Assert.assertTrue(followerState.lostMajorityHeartbeatsRecently());
+    } finally {
+      deIsolate(cluster, leader.getId());
+    }
+  }
+
+
+  @Test
   public void testEnforceLeader() throws Exception {
     LOG.info("Running testEnforceLeader");
     final int numServer = 5;