You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/12/24 08:41:55 UTC
[ratis] branch master updated: RATIS-1761. If LeaderStateImpl is not running, it should not restart a LogAppender. (#799)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 77a9949f9 RATIS-1761. If LeaderStateImpl is not running, it should not restart a LogAppender. (#799)
77a9949f9 is described below
commit 77a9949f98d6c80a8c1466887763d44fb64c9ccc
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Sat Dec 24 16:41:50 2022 +0800
RATIS-1761. If LeaderStateImpl is not running, it should not restart a LogAppender. (#799)
---
.../java/org/apache/ratis/util/OpenCloseState.java | 4 +--
.../apache/ratis/server/impl/LeaderStateImpl.java | 34 +++++++++++++++++-----
.../ratis/server/leader/LogAppenderBase.java | 8 +++++
.../ratis/server/leader/LogAppenderDaemon.java | 6 ++--
4 files changed, 39 insertions(+), 13 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java b/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java
index 7847c21cd..b79b49b27 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -60,7 +60,7 @@ public class OpenCloseState {
final Throwable t = state.get();
if (!(t instanceof OpenTrace)) {
final String s = name + " is expected to be opened but it is " + toString(t);
- throw new IllegalArgumentException(s, t);
+ throw new IllegalStateException(s, t);
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 644b93fd0..2a16602b6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -78,6 +78,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
@@ -164,7 +165,7 @@ class LeaderStateImpl implements LeaderState {
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
String s = this + ": poll() is interrupted";
- if (!running) {
+ if (isStopped.get()) {
LOG.info(s + " gracefully");
return null;
} else {
@@ -318,7 +319,7 @@ class LeaderStateImpl implements LeaderState {
private final PendingRequests pendingRequests;
private final WatchRequests watchRequests;
private final MessageStreamRequests messageStreamRequests;
- private volatile boolean running = true;
+ private final AtomicBoolean isStopped = new AtomicBoolean();
private final int stagingCatchupGap;
private final long placeHolderIndex;
@@ -393,7 +394,10 @@ class LeaderStateImpl implements LeaderState {
}
void stop() {
- this.running = false;
+ if (!isStopped.compareAndSet(false, true)) {
+ LOG.info("{} is already stopped", this);
+ return;
+ }
// do not interrupt event processor since it may be in the middle of logSync
senders.forEach(LogAppender::stop);
final NotLeaderException nle = server.generateNotLeaderException();
@@ -440,7 +444,8 @@ class LeaderStateImpl implements LeaderState {
*/
PendingRequest startSetConfiguration(SetConfigurationRequest request, List<RaftPeer> peersInNewConf) {
LOG.info("{}: startSetConfiguration {}", this, request);
- Preconditions.assertTrue(running && !inStagingState());
+ Preconditions.assertTrue(isRunning(), () -> this + " is not running.");
+ Preconditions.assertTrue(!inStagingState(), () -> this + " is already in staging state " + stagingState);
final List<RaftPeer> listenersInNewConf = request.getArguments().getPeersInNewConf(RaftPeerRole.LISTENER);
final Collection<RaftPeer> peersToBootStrap = server.getRaftConf().filterNotContainedInConf(peersInNewConf);
@@ -595,8 +600,21 @@ class LeaderStateImpl implements LeaderState {
senders.removeAll(toStop);
}
+ boolean isRunning() {
+ if (isStopped.get()) {
+ return false;
+ }
+ final LeaderStateImpl current = server.getRole().getLeaderState().orElse(null);
+ return this == current;
+ }
+
@Override
public void restart(LogAppender sender) {
+ if (!isRunning()) {
+ LOG.warn("Failed to restart {}: {} is not running", sender, this);
+ return;
+ }
+
final FollowerInfo info = sender.getFollower();
LOG.info("{}: Restarting {} for {}", this, JavaUtils.getClassSimpleName(sender.getClass()), info.getName());
sender.stop();
@@ -633,7 +651,7 @@ class LeaderStateImpl implements LeaderState {
LOG.warn(s, e);
// the failure should happen while changing the state to follower
// thus the in-memory state should have been updated
- if (running) {
+ if (!isStopped.get()) {
throw new IllegalStateException(s + " and running == true", e);
}
}
@@ -669,7 +687,7 @@ class LeaderStateImpl implements LeaderState {
private void prepare() {
synchronized (server) {
- if (running) {
+ if (isRunning()) {
final ServerState state = server.getState();
if (state.getRaftConf().isTransitional() && state.isConfCommitted()) {
// the configuration is in transitional state, and has been committed
@@ -694,10 +712,10 @@ class LeaderStateImpl implements LeaderState {
// apply an empty message; check if necessary to replicate (new) conf
prepare();
- while (running) {
+ while (isRunning()) {
final StateUpdateEvent event = eventQueue.poll();
synchronized(server) {
- if (running) {
+ if (isRunning()) {
if (event != null) {
event.execute();
} else if (inStagingState()) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index fda78fbcf..bc8a31181 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -125,6 +125,14 @@ public abstract class LogAppenderBase implements LogAppender {
daemon.tryToClose();
}
+ void restart() {
+ if (!server.getInfo().isAlive()) {
+ LOG.warn("Failed to restart {}: server {} is not alive", this, server.getMemberId());
+ return;
+ }
+ getLeaderState().restart(this);
+ }
+
@Override
public final FollowerInfo getFollower() {
return follower;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
index d985a6ae8..6ca237ecf 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
@@ -42,9 +42,9 @@ class LogAppenderDaemon {
private final LifeCycle lifeCycle;
private final Daemon daemon;
- private final LogAppender logAppender;
+ private final LogAppenderBase logAppender;
- LogAppenderDaemon(LogAppender logAppender) {
+ LogAppenderDaemon(LogAppenderBase logAppender) {
this.logAppender = logAppender;
this.name = logAppender + "-" + JavaUtils.getClassSimpleName(getClass());
this.lifeCycle = new LifeCycle(name);
@@ -88,7 +88,7 @@ class LogAppenderDaemon {
lifeCycle.transitionIfValid(EXCEPTION);
} finally {
if (lifeCycle.transitionAndGet(TRANSITION_FINALLY) == EXCEPTION) {
- logAppender.getLeaderState().restart(logAppender);
+ logAppender.restart();
}
}
}