You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/03/24 12:33:04 UTC

[ratis] branch master updated: RATIS-1558. Support Listener in RaftServerImpl (#627)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 77f21ae  RATIS-1558. Support Listener in RaftServerImpl (#627)
77f21ae is described below

commit 77f21ae82879120afdb1638a9cd0f3679ccbede1
Author: Yaolong Liu <ly...@163.com>
AuthorDate: Thu Mar 24 20:32:58 2022 +0800

    RATIS-1558. Support Listener in RaftServerImpl (#627)
---
 .../apache/ratis/server/impl/FollowerState.java    | 18 ++++++++++-----
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  2 ++
 .../apache/ratis/server/impl/RaftServerImpl.java   | 26 +++++++++++++++++-----
 3 files changed, 36 insertions(+), 10 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index 7f5e5a8..52ae033 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.leader.LeaderState;
 import org.apache.ratis.util.Daemon;
 import org.apache.ratis.util.JavaUtils;
@@ -109,10 +110,19 @@ class FollowerState extends Daemon {
     return true;
   }
 
+  private boolean shouldRun() {
+    final DivisionInfo info = server.getInfo();
+    final boolean run = isRunning && (info.isFollower() || info.isListener());
+    if (!run) {
+      LOG.info("{}: Stopping now (isRunning? {}, role = {})", this, isRunning, info.getCurrentRole());
+    }
+    return run;
+  }
+
   @Override
   public  void run() {
     final TimeDuration sleepDeviationThreshold = server.getSleepDeviationThreshold();
-    while (isRunning && server.getInfo().isFollower()) {
+    while (shouldRun()) {
       final TimeDuration electionTimeout = server.getRandomElectionTimeout();
       try {
         final TimeDuration extraSleep = electionTimeout.sleep();
@@ -122,14 +132,12 @@ class FollowerState extends Daemon {
           continue;
         }
 
-        final boolean isFollower = server.getInfo().isFollower();
-        if (!isRunning || !isFollower) {
-          LOG.info("{}: Stopping now (isRunning? {}, isFollower? {})", this, isRunning, isFollower);
+        if (!shouldRun()) {
           break;
         }
         synchronized (server) {
           if (outstandingOp.get() == 0
-              && isRunning
+              && isRunning && server.getInfo().isFollower()
               && lastRpcTime.elapsedTime().compareTo(electionTimeout) >= 0
               && !lostMajorityHeartbeatsRecently()) {
             LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}, electionTimeout:{}",
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 5390905..49ad684 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -706,6 +706,7 @@ class LeaderStateImpl implements LeaderState {
         applyOldNewConf();
         senders.stream()
             .map(LogAppender::getFollower)
+            .filter(f -> server.getRaftConf().containsInConf(f.getPeer().getId()))
             .map(FollowerInfoImpl.class::cast)
             .forEach(FollowerInfoImpl::startAttendVote);
       }
@@ -1100,6 +1101,7 @@ class LeaderStateImpl implements LeaderState {
   List<RaftPeer> getFollowers() {
     return Collections.unmodifiableList(senders.stream()
         .map(sender -> sender.getFollower().getPeer())
+        .filter(peer -> server.getRaftConf().containsInConf(peer.getId()))
         .collect(Collectors.toList()));
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 51447be..a62fad0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -313,7 +313,10 @@ class RaftServerImpl implements RaftServer.Division,
     final RaftConfigurationImpl conf = getRaftConf();
     if (conf != null && conf.containsInBothConfs(getId())) {
       LOG.info("{}: start as a follower, conf={}", getMemberId(), conf);
-      startAsFollower();
+      startAsPeer(RaftPeerRole.FOLLOWER);
+    } else if (conf != null && conf.containsInConf(getId(), RaftPeerRole.LISTENER)) {
+      LOG.info("{}: start as a listener, conf={}", getMemberId(), conf);
+      startAsPeer(RaftPeerRole.LISTENER);
     } else {
       LOG.info("{}: start with initializing state, conf={}", getMemberId(), conf);
       startInitializing();
@@ -335,11 +338,21 @@ class RaftServerImpl implements RaftServer.Division,
   }
 
   /**
-   * The peer belongs to the current configuration, should start as a follower
+   * The peer belongs to the current configuration, should start as a follower or listener
    */
-  private void startAsFollower() {
-    setRole(RaftPeerRole.FOLLOWER, "startAsFollower");
-    role.startFollowerState(this, "startAsFollower");
+  private void startAsPeer(RaftPeerRole newRole) {
+    Object reason = "";
+    if (newRole == RaftPeerRole.FOLLOWER) {
+      reason = "startAsFollower";
+      setRole(RaftPeerRole.FOLLOWER, reason);
+    } else if (newRole == RaftPeerRole.LISTENER) {
+      reason = "startAsListener";
+      setRole(RaftPeerRole.LISTENER, reason);
+    } else {
+      throw new IllegalArgumentException("Unexpected role " + newRole);
+    }
+    role.startFollowerState(this, reason);
+
     lifeCycle.transition(RUNNING);
   }
 
@@ -485,6 +498,9 @@ class RaftServerImpl implements RaftServer.Division,
    */
   private synchronized boolean changeToFollower(long newTerm, boolean force, Object reason) {
     final RaftPeerRole old = role.getCurrentRole();
+    if (old == RaftPeerRole.LISTENER) {
+      throw new IllegalStateException("Unexpected role " + old);
+    }
     final boolean metadataUpdated = state.updateCurrentTerm(newTerm);
 
     if (old != RaftPeerRole.FOLLOWER || force) {