You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2019/09/17 19:43:44 UTC

[incubator-ratis] branch master updated: RATIS-655. Change LeaderState and FollowerState and to use RaftGroupMemberId.

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c9308f6  RATIS-655. Change LeaderState and FollowerState and to use RaftGroupMemberId.
c9308f6 is described below

commit c9308f64d14c49d4edd9eb116853de7c46794ed9
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Sep 17 12:43:13 2019 -0700

    RATIS-655. Change LeaderState and FollowerState and to use RaftGroupMemberId.
---
 .../apache/ratis/server/impl/FollowerState.java    | 22 ++++----
 .../apache/ratis/server/impl/LeaderElection.java   | 18 +++----
 .../org/apache/ratis/server/impl/LeaderState.java  | 61 ++++++++++++++--------
 .../apache/ratis/server/impl/PendingRequests.java  |  4 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   | 12 ++---
 .../test/java/org/apache/ratis/RaftTestUtil.java   |  4 +-
 6 files changed, 68 insertions(+), 53 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index c20b1a8..d82af65 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -51,13 +51,15 @@ class FollowerState extends Daemon {
 
   static final Logger LOG = LoggerFactory.getLogger(FollowerState.class);
 
+  private final String name;
   private final RaftServerImpl server;
 
   private volatile Timestamp lastRpcTime = Timestamp.currentTime();
-  private volatile boolean monitorRunning = true;
+  private volatile boolean isRunning = true;
   private final AtomicInteger outstandingOp = new AtomicInteger();
 
   FollowerState(RaftServerImpl server) {
+    this.name = server.getMemberId() + "-" + getClass().getSimpleName();
     this.server = server;
   }
 
@@ -66,8 +68,7 @@ class FollowerState extends Daemon {
 
     final int n = type.update(outstandingOp);
     if (LOG.isTraceEnabled()) {
-      LOG.trace("{}: update lastRpcTime to {} for {}, outstandingOp={}",
-          server.getId(), lastRpcTime, type, n);
+      LOG.trace("{}: update lastRpcTime to {} for {}, outstandingOp={}", this, lastRpcTime, type, n);
     }
   }
 
@@ -84,27 +85,28 @@ class FollowerState extends Daemon {
   }
 
   void stopRunning() {
-    this.monitorRunning = false;
+    this.isRunning = false;
   }
 
   @Override
   public  void run() {
     long sleepDeviationThresholdMs = server.getSleepDeviationThresholdMs();
-    while (monitorRunning && server.isFollower()) {
+    while (isRunning && server.isFollower()) {
       final long electionTimeout = server.getRandomTimeoutMs();
       try {
         if (!JavaUtils.sleep(electionTimeout, sleepDeviationThresholdMs)) {
           continue;
         }
 
-        if (!monitorRunning || !server.isFollower()) {
-          LOG.info("{} heartbeat monitor quit", server.getId());
+        final boolean isFollower = server.isFollower();
+        if (!isRunning || !isFollower) {
+          LOG.info("{}: Stopping now (isRunning? {}, isFollower? {})", this, isRunning, isFollower);
           break;
         }
         synchronized (server) {
           if (outstandingOp.get() == 0 && lastRpcTime.elapsedTimeMs() >= electionTimeout) {
-            LOG.info("{}:{} changes to CANDIDATE, lastRpcTime:{}, electionTimeout:{}ms",
-                server.getId(), server.getGroupId(), lastRpcTime.elapsedTimeMs(), electionTimeout);
+            LOG.info("{}: change to CANDIDATE, lastRpcTime:{}ms, electionTimeout:{}ms",
+                this, lastRpcTime.elapsedTimeMs(), electionTimeout);
             server.getLeaderElectionMetricsRegistry().onLeaderElectionTimeout(); // Update timeout metric counters.
             // election timeout, should become a candidate
             server.changeToCandidate();
@@ -123,6 +125,6 @@ class FollowerState extends Daemon {
 
   @Override
   public String toString() {
-    return server.getId() + ": " + getClass().getSimpleName();
+    return name;
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 3c81348..2c77627 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -110,7 +110,7 @@ class LeaderElection implements Runnable {
   private final RaftServerImpl server;
 
   LeaderElection(RaftServerImpl server) {
-    this.name = server.getMemberId() + ":" + getClass().getSimpleName() + COUNT.incrementAndGet();
+    this.name = server.getMemberId() + "-" + getClass().getSimpleName() + COUNT.incrementAndGet();
     this.lifeCycle = new LifeCycle(this);
     this.daemon = new Daemon(this);
     this.server = server;
@@ -133,19 +133,15 @@ class LeaderElection implements Runnable {
       askForVotes();
     } catch(Throwable e) {
       final LifeCycle.State state = lifeCycle.getCurrentState();
-      final String message = "Failed " + this + ", state=" + state;
-
       if (state.isClosingOrClosed()) {
-        LogUtils.infoOrTrace(LOG, message, e);
         LOG.info("{}: {} is safely ignored since this is already {}",
-            this, e.getClass().getSimpleName(), state);
+            this, e.getClass().getSimpleName(), state, e);
       } else {
         if (!server.isAlive()) {
-          LogUtils.infoOrTrace(LOG, message, e);
           LOG.info("{}: {} is safely ignored since the server is not alive: {}",
-              this, e.getClass().getSimpleName(), server);
+              this, e.getClass().getSimpleName(), server, e);
         } else {
-          LOG.error(message, e);
+          LOG.error("{}: Failed, state={}", this, state, e);
         }
         shutdown();
       }
@@ -264,8 +260,10 @@ class LeaderElection implements Runnable {
         final RaftPeerId replierId = RaftPeerId.valueOf(r.getServerReply().getReplyId());
         final RequestVoteReplyProto previous = responses.putIfAbsent(replierId, r);
         if (previous != null) {
-          LOG.warn("{} received duplicated replies from {}, the 2nd reply is ignored: 1st = {}, 2nd = {}",
-              server.getId(), replierId, ServerProtoUtils.toString(previous), ServerProtoUtils.toString(r));
+          if (LOG.isWarnEnabled()) {
+            LOG.warn("{} received duplicated replies from {}, the 2nd reply is ignored: 1st={}, 2nd={}",
+                this, replierId, ServerProtoUtils.toString(previous), ServerProtoUtils.toString(r));
+          }
           continue;
         }
         if (r.getShouldShutdown()) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 8750d29..8b8b50a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -99,13 +99,14 @@ public class LeaderState {
   }
 
   private class EventQueue {
+    private final String name = server.getMemberId() + "-" + getClass().getSimpleName();
     private final BlockingQueue<StateUpdateEvent> queue = new ArrayBlockingQueue<>(4096);
 
     void submit(StateUpdateEvent event) {
       try {
         queue.put(event);
       } catch (InterruptedException e) {
-        LOG.info("{}: Interrupted when submitting {} ", server.getId(), event);
+        LOG.info("{}: Interrupted when submitting {} ", this, event);
       }
     }
 
@@ -114,7 +115,7 @@ public class LeaderState {
       try {
         e = queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS);
       } catch(InterruptedException ie) {
-        String s = server.getId() + ": " + getClass().getSimpleName() + " thread is interrupted";
+        String s = this + ": poll() is interrupted";
         if (!running) {
           LOG.info(s + " gracefully");
           return null;
@@ -129,6 +130,11 @@ public class LeaderState {
       }
       return e;
     }
+
+    @Override
+    public String toString() {
+      return name;
+    }
   }
 
   /**
@@ -174,6 +180,7 @@ public class LeaderState {
   private final StateUpdateEvent CHECK_STAGING_EVENT =
       new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING, -1, this::checkStaging);
 
+  private final String name;
   private final RaftServerImpl server;
   private final RaftLog raftLog;
   private final long currentTerm;
@@ -185,7 +192,7 @@ public class LeaderState {
    * The list is protected by the RaftServer's lock.
    */
   private final SenderList senders;
-  private final EventQueue eventQueue = new EventQueue();
+  private final EventQueue eventQueue;
   private final EventProcessor processor;
   private final PendingRequests pendingRequests;
   private final WatchRequests watchRequests;
@@ -197,6 +204,7 @@ public class LeaderState {
   private final HeartbeatMetrics heartbeatMetrics;
 
   LeaderState(RaftServerImpl server, RaftProperties properties) {
+    this.name = server.getMemberId() + "-" + getClass().getSimpleName();
     this.server = server;
 
     stagingCatchupGap = RaftServerConfigKeys.stagingCatchupGap(properties);
@@ -205,9 +213,11 @@ public class LeaderState {
     final ServerState state = server.getState();
     this.raftLog = state.getLog();
     this.currentTerm = state.getCurrentTerm();
+
+    this.eventQueue = new EventQueue();
     processor = new EventProcessor();
-    this.pendingRequests = new PendingRequests(server.getId(), properties);
-    this.watchRequests = new WatchRequests(server.getId(), properties);
+    this.pendingRequests = new PendingRequests(server.getMemberId(), properties);
+    this.watchRequests = new WatchRequests(server.getMemberId(), properties);
 
     final RaftConfiguration conf = server.getRaftConf();
     Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
@@ -249,9 +259,9 @@ public class LeaderState {
       server.getStateMachine().notifyNotLeader(transactions);
       watchRequests.failWatches(nle);
     } catch (IOException e) {
-      LOG.warn(server.getId() + ": Caught exception in sendNotLeaderResponses", e);
+      LOG.warn("{}: Caught exception in sendNotLeaderResponses", this, e);
     }
-    server.getServerRpc().notifyNotLeader(server.getGroupId());
+    server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId());
   }
 
   void notifySenders() {
@@ -274,6 +284,7 @@ public class LeaderState {
    * Start bootstrapping new peers
    */
   PendingRequest startSetConfiguration(SetConfigurationRequest request) {
+    LOG.info("{}: startSetConfiguration {}", this, request);
     Preconditions.assertTrue(running && !inStagingState());
 
     final List<RaftPeer> peersInNewConf = request.getPeersInNewConf();
@@ -303,14 +314,14 @@ public class LeaderState {
 
   PendingRequest addPendingRequest(PendingRequests.Permit permit, RaftClientRequest request, TransactionContext entry) {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("{}: addPendingRequest at {}, entry=", server.getId(), request,
+      LOG.debug("{}: addPendingRequest at {}, entry={}", this, request,
           ServerProtoUtils.toLogEntryString(entry.getLogEntry()));
     }
     return pendingRequests.add(permit, request, entry);
   }
 
   CompletableFuture<RaftClientReply> addWatchReqeust(RaftClientRequest request) {
-    LOG.debug("{}: addWatchRequest {}", server.getId(), request);
+    LOG.debug("{}: addWatchRequest {}", this, request);
     return watchRequests.add(request)
         .thenApply(v -> new RaftClientReply(request, server.getCommitInfos()))
         .exceptionally(e -> {
@@ -395,7 +406,7 @@ public class LeaderState {
 
   void restartSender(LogAppender sender) {
     final FollowerInfo follower = sender.getFollower();
-    LOG.info("{}: Restarting {} for {}", server.getId(), sender.getClass().getSimpleName(), follower.getName());
+    LOG.info("{}: Restarting {} for {}", this, sender.getClass().getSimpleName(), follower.getName());
     senders.removeAll(Collections.singleton(sender));
     addAndStartSenders(Collections.singleton(follower.getPeer()));
   }
@@ -420,7 +431,7 @@ public class LeaderState {
     try {
       server.changeToFollowerAndPersistMetadata(term, "stepDown");
     } catch(IOException e) {
-      final String s = server.getId() + ": Failed to persist metadata for term " + term;
+      final String s = this + ": Failed to persist metadata for term " + term;
       LOG.warn(s, e);
       // the failure should happen while changing the state to follower
       // thus the in-memory state should have been updated
@@ -484,8 +495,7 @@ public class LeaderState {
     final Timestamp progressTime = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
     final Timestamp timeoutTime = Timestamp.currentTime().addTimeMs(-3*server.getMaxTimeoutMs());
     if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) {
-      LOG.debug("{} detects a follower {} timeout for bootstrapping," +
-              " timeoutTime: {}", server.getId(), follower, timeoutTime);
+      LOG.debug("{} detects a follower {} timeout ({}) for bootstrapping", this, follower, timeoutTime);
       return BootStrapProgress.NOPROGRESS;
     } else if (follower.getMatchIndex() + stagingCatchupGap > committed
         && follower.getLastRpcResponseTime().compareTo(progressTime) > 0) {
@@ -516,8 +526,7 @@ public class LeaderState {
           .getLastCommittedIndex();
       Collection<BootStrapProgress> reports = checkAllProgress(committedIndex);
       if (reports.contains(BootStrapProgress.NOPROGRESS)) {
-        LOG.debug("{} fails the setConfiguration request", server.getId());
-        stagingState.fail();
+        stagingState.fail(BootStrapProgress.NOPROGRESS);
       } else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
         // all caught up!
         applyOldNewConf();
@@ -640,8 +649,7 @@ public class LeaderState {
         pendingRequests.replySetConfiguration(server::getCommitInfos);
         // if the leader is not included in the current configuration, step down
         if (!conf.containsInConf(server.getId())) {
-          LOG.info("{} is not included in the new configuration {}. Step down.",
-              server.getId(), conf);
+          LOG.info("{} is not included in the new configuration {}. Will shutdown server...", this, conf);
           try {
             // leave some time for all RPC senders to send out new conf entry
             Thread.sleep(server.getMinTimeoutMs());
@@ -725,6 +733,7 @@ public class LeaderState {
   }
 
   private class ConfigurationStagingState {
+    private final String name = server.getMemberId() + "-" + getClass().getSimpleName();
     private final Map<RaftPeerId, RaftPeer> newPeers;
     private final PeerConfiguration newConf;
 
@@ -755,14 +764,19 @@ public class LeaderState {
       return newPeers.containsKey(peerId);
     }
 
-    void fail() {
+    void fail(BootStrapProgress progress) {
+      final String message = this + ": Fail to set configuration " + newConf + " due to " + progress;
+      LOG.debug(message);
       stopAndRemoveSenders(s -> !s.getFollower().isAttendingVote());
 
       LeaderState.this.stagingState = null;
       // send back failure response to client's request
-      pendingRequests.failSetConfiguration(
-          new ReconfigurationTimeoutException("Fail to set configuration "
-              + newConf + ". Timeout when bootstrapping new peers."));
+      pendingRequests.failSetConfiguration(new ReconfigurationTimeoutException(message));
+    }
+
+    @Override
+    public String toString() {
+      return name;
     }
   }
 
@@ -787,4 +801,9 @@ public class LeaderState {
   void recordFollowerHeartbeatElapsedTime(String followerId, long elapsedTime) {
     heartbeatMetrics.recordFollowerHeartbeatElapsedTime(followerId, elapsedTime);
   }
+
+  @Override
+  public String toString() {
+    return name;
+  }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 46f0c1f..7fa03b8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -24,7 +24,7 @@ import org.apache.ratis.protocol.NotLeaderException;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftException;
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.statemachine.TransactionContext;
@@ -135,7 +135,7 @@ class PendingRequests {
   private final String name;
   private final RequestMap pendingRequests;
 
-  PendingRequests(RaftPeerId id, RaftProperties properties) {
+  PendingRequests(RaftGroupMemberId id, RaftProperties properties) {
     this.name = id + "-" + getClass().getSimpleName();
     this.pendingRequests = new RequestMap(id, RaftServerConfigKeys.Write.elementLimit(properties));
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 8395b06..bc4e69a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -149,10 +149,6 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     return sleepDeviationThresholdMs;
   }
 
-  public RaftGroupId getGroupId() {
-    return getMemberId().getGroupId();
-  }
-
   public StateMachine getStateMachine() {
     return stateMachine;
   }
@@ -189,7 +185,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
       startInitializing();
     }
 
-    registerMBean(getId(), getGroupId(), jmxAdapter, jmxAdapter);
+    registerMBean(getId(), getMemberId().getGroupId(), jmxAdapter, jmxAdapter);
     state.start();
     return true;
   }
@@ -243,7 +239,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
   }
 
   RaftGroup getGroup() {
-    return RaftGroup.valueOf(getGroupId(), getRaftConf().getPeers());
+    return RaftGroup.valueOf(getMemberId().getGroupId(), getRaftConf().getPeers());
   }
 
   public void shutdown(boolean deleteDirectory) {
@@ -474,7 +470,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
   }
 
   void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws GroupMismatchException {
-    final RaftGroupId groupId = getGroupId();
+    final RaftGroupId groupId = getMemberId().getGroupId();
     if (!groupId.equals(requestorGroupId)) {
       throw new GroupMismatchException(getMemberId()
           + ": The group (" + requestorGroupId + ") of " + requestorId
@@ -1329,7 +1325,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
 
     @Override
     public String getGroupId() {
-      return RaftServerImpl.this.getGroupId().toString();
+      return getMemberId().getGroupId().toString();
     }
 
     @Override
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 5b26419..f202207 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -180,12 +180,12 @@ public interface RaftTestUtil {
   }
 
   static void assertLogEntries(RaftServerImpl server, long expectedTerm, SimpleMessage... expectedMessages) {
-    LOG.info("checking raft log for " + server.getId());
+    LOG.info("checking raft log for {}", server.getMemberId());
     final RaftLog log = server.getState().getLog();
     try {
       RaftTestUtil.assertLogEntries(log, expectedTerm, expectedMessages);
     } catch (AssertionError e) {
-      LOG.error(server.getId() + ": Unexpected raft log", e);
+      LOG.error("Unexpected raft log in {}", server.getMemberId(), e);
       throw e;
     }
   }