You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/12/24 08:41:55 UTC

[ratis] branch master updated: RATIS-1761. If LeaderStateImpl is not running, it should not restart a LogAppender. (#799)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 77a9949f9 RATIS-1761. If LeaderStateImpl is not running, it should not restart a LogAppender. (#799)
77a9949f9 is described below

commit 77a9949f98d6c80a8c1466887763d44fb64c9ccc
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Sat Dec 24 16:41:50 2022 +0800

    RATIS-1761. If LeaderStateImpl is not running, it should not restart a LogAppender. (#799)
---
 .../java/org/apache/ratis/util/OpenCloseState.java |  4 +--
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 34 +++++++++++++++++-----
 .../ratis/server/leader/LogAppenderBase.java       |  8 +++++
 .../ratis/server/leader/LogAppenderDaemon.java     |  6 ++--
 4 files changed, 39 insertions(+), 13 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java b/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java
index 7847c21cd..b79b49b27 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -60,7 +60,7 @@ public class OpenCloseState {
     final Throwable t = state.get();
     if (!(t instanceof OpenTrace)) {
       final String s = name + " is expected to be opened but it is " + toString(t);
-      throw new IllegalArgumentException(s, t);
+      throw new IllegalStateException(s, t);
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 644b93fd0..2a16602b6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -78,6 +78,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 import java.util.function.Predicate;
@@ -164,7 +165,7 @@ class LeaderStateImpl implements LeaderState {
       } catch (InterruptedException ie) {
         Thread.currentThread().interrupt();
         String s = this + ": poll() is interrupted";
-        if (!running) {
+        if (isStopped.get()) {
           LOG.info(s + " gracefully");
           return null;
         } else {
@@ -318,7 +319,7 @@ class LeaderStateImpl implements LeaderState {
   private final PendingRequests pendingRequests;
   private final WatchRequests watchRequests;
   private final MessageStreamRequests messageStreamRequests;
-  private volatile boolean running = true;
+  private final AtomicBoolean isStopped = new AtomicBoolean();
 
   private final int stagingCatchupGap;
   private final long placeHolderIndex;
@@ -393,7 +394,10 @@ class LeaderStateImpl implements LeaderState {
   }
 
   void stop() {
-    this.running = false;
+    if (!isStopped.compareAndSet(false, true)) {
+      LOG.info("{} is already stopped", this);
+      return;
+    }
     // do not interrupt event processor since it may be in the middle of logSync
     senders.forEach(LogAppender::stop);
     final NotLeaderException nle = server.generateNotLeaderException();
@@ -440,7 +444,8 @@ class LeaderStateImpl implements LeaderState {
    */
   PendingRequest startSetConfiguration(SetConfigurationRequest request, List<RaftPeer> peersInNewConf) {
     LOG.info("{}: startSetConfiguration {}", this, request);
-    Preconditions.assertTrue(running && !inStagingState());
+    Preconditions.assertTrue(isRunning(), () -> this + " is not running.");
+    Preconditions.assertTrue(!inStagingState(), () -> this + " is already in staging state " + stagingState);
 
     final List<RaftPeer> listenersInNewConf = request.getArguments().getPeersInNewConf(RaftPeerRole.LISTENER);
     final Collection<RaftPeer> peersToBootStrap = server.getRaftConf().filterNotContainedInConf(peersInNewConf);
@@ -595,8 +600,21 @@ class LeaderStateImpl implements LeaderState {
     senders.removeAll(toStop);
   }
 
+  boolean isRunning() {
+    if (isStopped.get()) {
+      return false;
+    }
+    final LeaderStateImpl current = server.getRole().getLeaderState().orElse(null);
+    return this == current;
+  }
+
   @Override
   public void restart(LogAppender sender) {
+    if (!isRunning()) {
+      LOG.warn("Failed to restart {}: {} is not running", sender, this);
+      return;
+    }
+
     final FollowerInfo info = sender.getFollower();
     LOG.info("{}: Restarting {} for {}", this, JavaUtils.getClassSimpleName(sender.getClass()), info.getName());
     sender.stop();
@@ -633,7 +651,7 @@ class LeaderStateImpl implements LeaderState {
       LOG.warn(s, e);
       // the failure should happen while changing the state to follower
       // thus the in-memory state should have been updated
-      if (running) {
+      if (!isStopped.get()) {
         throw new IllegalStateException(s + " and running == true", e);
       }
     }
@@ -669,7 +687,7 @@ class LeaderStateImpl implements LeaderState {
 
   private void prepare() {
     synchronized (server) {
-      if (running) {
+      if (isRunning()) {
         final ServerState state = server.getState();
         if (state.getRaftConf().isTransitional() && state.isConfCommitted()) {
           // the configuration is in transitional state, and has been committed
@@ -694,10 +712,10 @@ class LeaderStateImpl implements LeaderState {
       // apply an empty message; check if necessary to replicate (new) conf
       prepare();
 
-      while (running) {
+      while (isRunning()) {
         final StateUpdateEvent event = eventQueue.poll();
         synchronized(server) {
-          if (running) {
+          if (isRunning()) {
             if (event != null) {
               event.execute();
             } else if (inStagingState()) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index fda78fbcf..bc8a31181 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -125,6 +125,14 @@ public abstract class LogAppenderBase implements LogAppender {
     daemon.tryToClose();
   }
 
+  void restart() {
+    if (!server.getInfo().isAlive()) {
+      LOG.warn("Failed to restart {}: server {} is not alive", this, server.getMemberId());
+      return;
+    }
+    getLeaderState().restart(this);
+  }
+
   @Override
   public final FollowerInfo getFollower() {
     return follower;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
index d985a6ae8..6ca237ecf 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
@@ -42,9 +42,9 @@ class LogAppenderDaemon {
   private final LifeCycle lifeCycle;
   private final Daemon daemon;
 
-  private final LogAppender logAppender;
+  private final LogAppenderBase logAppender;
 
-  LogAppenderDaemon(LogAppender logAppender) {
+  LogAppenderDaemon(LogAppenderBase logAppender) {
     this.logAppender = logAppender;
     this.name = logAppender + "-" + JavaUtils.getClassSimpleName(getClass());
     this.lifeCycle = new LifeCycle(name);
@@ -88,7 +88,7 @@ class LogAppenderDaemon {
       lifeCycle.transitionIfValid(EXCEPTION);
     } finally {
       if (lifeCycle.transitionAndGet(TRANSITION_FINALLY) == EXCEPTION) {
-        logAppender.getLeaderState().restart(logAppender);
+        logAppender.restart();
       }
     }
   }