You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/09/30 06:45:20 UTC
[ratis] 03/03: RATIS-1709 Support specify ThreadGroup for Daemon threads (#733)
This is an automated email from the ASF dual-hosted git repository.
dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 58389a6ef401ce5e575c0095932d983e42487937
Author: Jiacheng Liu <ji...@gmail.com>
AuthorDate: Wed Sep 21 15:53:36 2022 +0800
RATIS-1709 Support specify ThreadGroup for Daemon threads (#733)
(cherry picked from commit 31eff226ded5a0d8d1d64a50661a6c6260da84ef)
---
.../main/java/org/apache/ratis/util/Daemon.java | 8 +++++++-
.../java/org/apache/ratis/server/RaftServer.java | 23 ++++++++++++++++++----
.../apache/ratis/server/impl/FollowerState.java | 4 +++-
.../apache/ratis/server/impl/LeaderElection.java | 3 ++-
.../apache/ratis/server/impl/LeaderStateImpl.java | 7 ++++---
.../apache/ratis/server/impl/RaftServerImpl.java | 9 ++++++++-
.../apache/ratis/server/impl/RaftServerProxy.java | 8 +++++++-
.../apache/ratis/server/impl/ServerImplUtils.java | 10 +++++-----
.../ratis/server/impl/StateMachineUpdater.java | 4 ++--
.../ratis/server/leader/LogAppenderDaemon.java | 3 ++-
.../apache/ratis/server/impl/MiniRaftCluster.java | 2 +-
11 files changed, 60 insertions(+), 21 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
index 8e576a0ac..b3797fa7e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
@@ -26,7 +26,7 @@ public class Daemon extends Thread {
/** Construct a daemon thread with flexible arguments. */
protected Daemon(Builder builder) {
- super(builder.runnable);
+ super(builder.threadGroup, builder.runnable);
setName(builder.name);
}
@@ -38,6 +38,7 @@ public class Daemon extends Thread {
public static class Builder {
private String name;
private Runnable runnable;
+ private ThreadGroup threadGroup;
public Builder setName(String name) {
this.name = name;
@@ -49,6 +50,11 @@ public class Daemon extends Thread {
return this;
}
+ public Builder setThreadGroup(ThreadGroup threadGroup) {
+ this.threadGroup = threadGroup;
+ return this;
+ }
+
public Daemon build() {
Objects.requireNonNull(name, "name == null");
return new Daemon(this);
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
index e9719b96c..2c01e7aaa 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -116,6 +116,9 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the internal {@link RaftClient} of this division. */
RaftClient getRaftClient();
+ /** @return the {@link ThreadGroup} the threads of this Division belong to. */
+ ThreadGroup getThreadGroup();
+
@Override
void close();
}
@@ -168,7 +171,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
private static Method initNewRaftServerMethod() {
final String className = RaftServer.class.getPackage().getName() + ".impl.ServerImplUtils";
final Class<?>[] argClasses = {RaftPeerId.class, RaftGroup.class, StateMachine.Registry.class,
- RaftProperties.class, Parameters.class};
+ ThreadGroup.class, RaftProperties.class, Parameters.class};
try {
final Class<?> clazz = ReflectionUtils.getClassByName(className);
return clazz.getMethod("newRaftServer", argClasses);
@@ -178,11 +181,11 @@ public interface RaftServer extends Closeable, RpcType.Get,
}
private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group,
- StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters)
- throws IOException {
+ StateMachine.Registry stateMachineRegistry, ThreadGroup threadGroup,
+ RaftProperties properties, Parameters parameters) throws IOException {
try {
return (RaftServer) NEW_RAFT_SERVER_METHOD.invoke(null,
- serverId, group, stateMachineRegistry, properties, parameters);
+ serverId, group, stateMachineRegistry, threadGroup, properties, parameters);
} catch (IllegalAccessException e) {
throw new IllegalStateException("Failed to build " + serverId, e);
} catch (InvocationTargetException e) {
@@ -195,6 +198,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
private RaftGroup group = null;
private RaftProperties properties;
private Parameters parameters;
+ private ThreadGroup threadGroup;
/** @return a {@link RaftServer} object. */
public RaftServer build() throws IOException {
@@ -203,6 +207,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
group,
Objects.requireNonNull(stateMachineRegistry , "Neither 'stateMachine' nor 'setStateMachineRegistry' " +
"is initialized."),
+ threadGroup,
Objects.requireNonNull(properties, "The 'properties' field is not initialized."),
parameters);
}
@@ -241,5 +246,15 @@ public interface RaftServer extends Closeable, RpcType.Get,
this.parameters = parameters;
return this;
}
+
+ /**
+ * Set {@link ThreadGroup} so the application can control RaftServer threads consistently with the application.
+ * For example, configure {@link ThreadGroup#uncaughtException(Thread, Throwable)} for the whole thread group.
+ * If not set, the new thread will be put into the thread group of the caller thread.
+ */
+ public Builder setThreadGroup(ThreadGroup threadGroup) {
+ this.threadGroup = threadGroup;
+ return this;
+ }
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index afad7c559..3911e39a5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -64,7 +64,9 @@ class FollowerState extends Daemon {
private final AtomicInteger outstandingOp = new AtomicInteger();
FollowerState(RaftServerImpl server, Object reason) {
- super(newBuilder().setName(server.getMemberId() + "-" + JavaUtils.getClassSimpleName(FollowerState.class)));
+ super(newBuilder()
+ .setName(server.getMemberId() + "-" + JavaUtils.getClassSimpleName(FollowerState.class))
+ .setThreadGroup(server.getThreadGroup()));
this.server = server;
this.reason = reason;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index cc7f623e3..ced72604a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -191,7 +191,8 @@ class LeaderElection implements Runnable {
LeaderElection(RaftServerImpl server, boolean skipPreVote) {
this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()) + COUNT.incrementAndGet();
this.lifeCycle = new LifeCycle(this);
- this.daemon = Daemon.newBuilder().setName(name).setRunnable(this).build();
+ this.daemon = Daemon.newBuilder().setName(name).setRunnable(this)
+ .setThreadGroup(server.getThreadGroup()).build();
this.server = server;
this.skipPreVote = skipPreVote ||
!RaftServerConfigKeys.LeaderElection.preVote(
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index fe36a1a6a..e14653225 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -269,7 +269,7 @@ class LeaderStateImpl implements LeaderState {
this.currentTerm = state.getCurrentTerm();
this.eventQueue = new EventQueue();
- processor = new EventProcessor(this.name);
+ processor = new EventProcessor(this.name, server);
raftServerMetrics = server.getRaftServerMetrics();
logAppenderMetrics = new LogAppenderMetrics(server.getMemberId());
this.pendingRequests = new PendingRequests(server.getMemberId(), properties, raftServerMetrics);
@@ -617,8 +617,9 @@ class LeaderStateImpl implements LeaderState {
* state, such as changing to follower, or updating the committed index.
*/
private class EventProcessor extends Daemon {
- public EventProcessor(String name) {
- super(Daemon.newBuilder().setName(name));
+ public EventProcessor(String name, RaftServerImpl server) {
+ super(Daemon.newBuilder()
+ .setName(name).setThreadGroup(server.getThreadGroup()));
}
@Override
public void run() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 1994fa8de..2aa14cbed 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -97,7 +97,7 @@ import org.apache.ratis.util.function.CheckedSupplier;
class RaftServerImpl implements RaftServer.Division,
RaftServerProtocol, RaftServerAsynchronousProtocol,
- RaftClientProtocol, RaftClientAsynchronousProtocol{
+ RaftClientProtocol, RaftClientAsynchronousProtocol {
private static final String CLASS_NAME = JavaUtils.getClassSimpleName(RaftServerImpl.class);
static final String REQUEST_VOTE = CLASS_NAME + ".requestVote";
static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
@@ -188,6 +188,7 @@ class RaftServerImpl implements RaftServer.Division,
private final ExecutorService clientExecutor;
private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
+ private final ThreadGroup threadGroup;
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException {
final RaftPeerId id = proxy.getId();
@@ -213,6 +214,7 @@ class RaftServerImpl implements RaftServer.Division,
getMemberId(), () -> commitInfoCache::get, retryCache::getStatistics);
this.startComplete = new AtomicBoolean(false);
+ this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString());
this.raftClient = JavaUtils.memoize(() -> RaftClient.newBuilder()
.setRaftGroup(group)
@@ -271,6 +273,11 @@ class RaftServerImpl implements RaftServer.Division,
return sleepDeviationThreshold;
}
+ @Override
+ public ThreadGroup getThreadGroup() {
+ return threadGroup;
+ }
+
@Override
public StateMachine getStateMachine() {
return stateMachine;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index b8cee7f53..bef72ee0d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -193,9 +193,10 @@ class RaftServerProxy implements RaftServer {
private final ExecutorService executor;
private final JvmPauseMonitor pauseMonitor;
+ private final ThreadGroup threadGroup;
RaftServerProxy(RaftPeerId id, StateMachine.Registry stateMachineRegistry,
- RaftProperties properties, Parameters parameters) {
+ RaftProperties properties, Parameters parameters, ThreadGroup threadGroup) {
this.properties = properties;
this.stateMachineRegistry = stateMachineRegistry;
@@ -218,6 +219,7 @@ class RaftServerProxy implements RaftServer {
final TimeDuration leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
this.pauseMonitor = new JvmPauseMonitor(id,
extraSleep -> handleJvmPause(extraSleep, rpcSlownessTimeout, leaderStepDownWaitTime));
+ this.threadGroup = threadGroup == null ? new ThreadGroup(this.id.toString()) : threadGroup;
}
private void handleJvmPause(TimeDuration extraSleep, TimeDuration closeThreshold, TimeDuration stepDownThreshold)
@@ -376,6 +378,10 @@ class RaftServerProxy implements RaftServer {
return lifeCycle.getCurrentState();
}
+ ThreadGroup getThreadGroup() {
+ return threadGroup;
+ }
+
@Override
public void start() throws IOException {
lifeCycle.startAndTransition(this::startImpl, IOException.class);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 6777b9093..6a29b4cbb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -46,26 +46,26 @@ public final class ServerImplUtils {
/** Create a {@link RaftServerProxy}. */
public static RaftServerProxy newRaftServer(
RaftPeerId id, RaftGroup group, StateMachine.Registry stateMachineRegistry,
- RaftProperties properties, Parameters parameters) throws IOException {
+ ThreadGroup threadGroup, RaftProperties properties, Parameters parameters) throws IOException {
RaftServer.LOG.debug("newRaftServer: {}, {}", id, group);
if (group != null && !group.getPeers().isEmpty()) {
Preconditions.assertNotNull(id, "RaftPeerId %s is not in RaftGroup %s", id, group);
Preconditions.assertNotNull(group.getPeer(id), "RaftPeerId %s is not in RaftGroup %s", id, group);
}
- final RaftServerProxy proxy = newRaftServer(id, stateMachineRegistry, properties, parameters);
+ final RaftServerProxy proxy = newRaftServer(id, stateMachineRegistry, threadGroup, properties, parameters);
proxy.initGroups(group);
return proxy;
}
private static RaftServerProxy newRaftServer(
- RaftPeerId id, StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters)
- throws IOException {
+ RaftPeerId id, StateMachine.Registry stateMachineRegistry, ThreadGroup threadGroup, RaftProperties properties,
+ Parameters parameters) throws IOException {
final TimeDuration sleepTime = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
final RaftServerProxy proxy;
try {
// attempt multiple times to avoid temporary bind exception
proxy = JavaUtils.attemptRepeatedly(
- () -> new RaftServerProxy(id, stateMachineRegistry, properties, parameters),
+ () -> new RaftServerProxy(id, stateMachineRegistry, properties, parameters, threadGroup),
5, sleepTime, "new RaftServerProxy", RaftServer.LOG);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 321d1d71a..bd62b65ac 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -109,8 +109,8 @@ class StateMachineUpdater implements Runnable {
}
};
this.purgeUptoSnapshotIndex = RaftServerConfigKeys.Log.purgeUptoSnapshotIndex(properties);
-
- updater = Daemon.newBuilder().setName(name).setRunnable(this).build();
+ updater = Daemon.newBuilder().setName(name).setRunnable(this)
+ .setThreadGroup(server.getThreadGroup()).build();
this.awaitForSignal = new AwaitForSignal(name);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
index d1688987d..d985a6ae8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
@@ -48,7 +48,8 @@ class LogAppenderDaemon {
this.logAppender = logAppender;
this.name = logAppender + "-" + JavaUtils.getClassSimpleName(getClass());
this.lifeCycle = new LifeCycle(name);
- this.daemon = Daemon.newBuilder().setName(name).setRunnable(this::run).build();
+ this.daemon = Daemon.newBuilder().setName(name).setRunnable(this::run)
+ .setThreadGroup(logAppender.getServer().getThreadGroup()).build();
}
public boolean isWorking() {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 64638a2e8..ec2c285e5 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -387,7 +387,7 @@ public abstract class MiniRaftCluster implements Closeable {
}
final RaftProperties prop = new RaftProperties(properties);
RaftServerConfigKeys.setStorageDir(prop, Collections.singletonList(dir));
- return ServerImplUtils.newRaftServer(id, group, getStateMachineRegistry(prop), prop,
+ return ServerImplUtils.newRaftServer(id, group, getStateMachineRegistry(prop), null, prop,
setPropertiesAndInitParameters(id, group, prop));
} catch (IOException e) {
throw new RuntimeException(e);