You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/09/30 06:45:20 UTC

[ratis] 03/03: RATIS-1709 Support specify ThreadGroup for Daemon threads (#733)

This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 58389a6ef401ce5e575c0095932d983e42487937
Author: Jiacheng Liu <ji...@gmail.com>
AuthorDate: Wed Sep 21 15:53:36 2022 +0800

    RATIS-1709 Support specify ThreadGroup for Daemon threads (#733)
    
    (cherry picked from commit 31eff226ded5a0d8d1d64a50661a6c6260da84ef)
---
 .../main/java/org/apache/ratis/util/Daemon.java    |  8 +++++++-
 .../java/org/apache/ratis/server/RaftServer.java   | 23 ++++++++++++++++++----
 .../apache/ratis/server/impl/FollowerState.java    |  4 +++-
 .../apache/ratis/server/impl/LeaderElection.java   |  3 ++-
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  7 ++++---
 .../apache/ratis/server/impl/RaftServerImpl.java   |  9 ++++++++-
 .../apache/ratis/server/impl/RaftServerProxy.java  |  8 +++++++-
 .../apache/ratis/server/impl/ServerImplUtils.java  | 10 +++++-----
 .../ratis/server/impl/StateMachineUpdater.java     |  4 ++--
 .../ratis/server/leader/LogAppenderDaemon.java     |  3 ++-
 .../apache/ratis/server/impl/MiniRaftCluster.java  |  2 +-
 11 files changed, 60 insertions(+), 21 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
index 8e576a0ac..b3797fa7e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
@@ -26,7 +26,7 @@ public class Daemon extends Thread {
 
   /** Construct a daemon thread with flexible arguments. */
   protected Daemon(Builder builder) {
-    super(builder.runnable);
+    super(builder.threadGroup, builder.runnable);
     setName(builder.name);
   }
 
@@ -38,6 +38,7 @@ public class Daemon extends Thread {
   public static class Builder {
     private String name;
     private Runnable runnable;
+    private ThreadGroup threadGroup;
 
     public Builder setName(String name) {
       this.name = name;
@@ -49,6 +50,11 @@ public class Daemon extends Thread {
       return this;
     }
 
+    public Builder setThreadGroup(ThreadGroup threadGroup) {
+      this.threadGroup = threadGroup;
+      return this;
+    }
+
     public Daemon build() {
       Objects.requireNonNull(name, "name == null");
       return new Daemon(this);
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
index e9719b96c..2c01e7aaa 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -116,6 +116,9 @@ public interface RaftServer extends Closeable, RpcType.Get,
     /** @return the internal {@link RaftClient} of this division. */
     RaftClient getRaftClient();
 
+    /** @return the {@link ThreadGroup} the threads of this Division belong to. */
+    ThreadGroup getThreadGroup();
+
     @Override
     void close();
   }
@@ -168,7 +171,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
     private static Method initNewRaftServerMethod() {
       final String className = RaftServer.class.getPackage().getName() + ".impl.ServerImplUtils";
       final Class<?>[] argClasses = {RaftPeerId.class, RaftGroup.class, StateMachine.Registry.class,
-          RaftProperties.class, Parameters.class};
+          ThreadGroup.class, RaftProperties.class, Parameters.class};
       try {
         final Class<?> clazz = ReflectionUtils.getClassByName(className);
         return clazz.getMethod("newRaftServer", argClasses);
@@ -178,11 +181,11 @@ public interface RaftServer extends Closeable, RpcType.Get,
     }
 
     private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group,
-        StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters)
-        throws IOException {
+        StateMachine.Registry stateMachineRegistry, ThreadGroup threadGroup,
+        RaftProperties properties, Parameters parameters) throws IOException {
       try {
         return (RaftServer) NEW_RAFT_SERVER_METHOD.invoke(null,
-            serverId, group, stateMachineRegistry, properties, parameters);
+            serverId, group, stateMachineRegistry, threadGroup, properties, parameters);
       } catch (IllegalAccessException e) {
         throw new IllegalStateException("Failed to build " + serverId, e);
       } catch (InvocationTargetException e) {
@@ -195,6 +198,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
     private RaftGroup group = null;
     private RaftProperties properties;
     private Parameters parameters;
+    private ThreadGroup threadGroup;
 
     /** @return a {@link RaftServer} object. */
     public RaftServer build() throws IOException {
@@ -203,6 +207,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
           group,
           Objects.requireNonNull(stateMachineRegistry , "Neither 'stateMachine' nor 'setStateMachineRegistry' " +
               "is initialized."),
+          threadGroup,
           Objects.requireNonNull(properties, "The 'properties' field is not initialized."),
           parameters);
     }
@@ -241,5 +246,15 @@ public interface RaftServer extends Closeable, RpcType.Get,
       this.parameters = parameters;
       return this;
     }
+
+    /**
+     * Set {@link ThreadGroup} so the application can control RaftServer threads consistently with the application.
+     * For example, configure {@link ThreadGroup#uncaughtException(Thread, Throwable)} for the whole thread group.
+     * If not set, the new thread will be put into the thread group of the caller thread.
+     */
+    public Builder setThreadGroup(ThreadGroup threadGroup) {
+      this.threadGroup = threadGroup;
+      return this;
+    }
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index afad7c559..3911e39a5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -64,7 +64,9 @@ class FollowerState extends Daemon {
   private final AtomicInteger outstandingOp = new AtomicInteger();
 
   FollowerState(RaftServerImpl server, Object reason) {
-    super(newBuilder().setName(server.getMemberId() + "-" + JavaUtils.getClassSimpleName(FollowerState.class)));
+    super(newBuilder()
+        .setName(server.getMemberId() + "-" + JavaUtils.getClassSimpleName(FollowerState.class))
+        .setThreadGroup(server.getThreadGroup()));
     this.server = server;
     this.reason = reason;
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index cc7f623e3..ced72604a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -191,7 +191,8 @@ class LeaderElection implements Runnable {
   LeaderElection(RaftServerImpl server, boolean skipPreVote) {
     this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()) + COUNT.incrementAndGet();
     this.lifeCycle = new LifeCycle(this);
-    this.daemon = Daemon.newBuilder().setName(name).setRunnable(this).build();
+    this.daemon = Daemon.newBuilder().setName(name).setRunnable(this)
+        .setThreadGroup(server.getThreadGroup()).build();
     this.server = server;
     this.skipPreVote = skipPreVote ||
         !RaftServerConfigKeys.LeaderElection.preVote(
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index fe36a1a6a..e14653225 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -269,7 +269,7 @@ class LeaderStateImpl implements LeaderState {
     this.currentTerm = state.getCurrentTerm();
 
     this.eventQueue = new EventQueue();
-    processor = new EventProcessor(this.name);
+    processor = new EventProcessor(this.name, server);
     raftServerMetrics = server.getRaftServerMetrics();
     logAppenderMetrics = new LogAppenderMetrics(server.getMemberId());
     this.pendingRequests = new PendingRequests(server.getMemberId(), properties, raftServerMetrics);
@@ -617,8 +617,9 @@ class LeaderStateImpl implements LeaderState {
    * state, such as changing to follower, or updating the committed index.
    */
   private class EventProcessor extends Daemon {
-    public EventProcessor(String name) {
-      super(Daemon.newBuilder().setName(name));
+    public EventProcessor(String name, RaftServerImpl server) {
+      super(Daemon.newBuilder()
+          .setName(name).setThreadGroup(server.getThreadGroup()));
     }
     @Override
     public void run() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 1994fa8de..2aa14cbed 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -97,7 +97,7 @@ import org.apache.ratis.util.function.CheckedSupplier;
 
 class RaftServerImpl implements RaftServer.Division,
     RaftServerProtocol, RaftServerAsynchronousProtocol,
-    RaftClientProtocol, RaftClientAsynchronousProtocol{
+    RaftClientProtocol, RaftClientAsynchronousProtocol {
   private static final String CLASS_NAME = JavaUtils.getClassSimpleName(RaftServerImpl.class);
   static final String REQUEST_VOTE = CLASS_NAME + ".requestVote";
   static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
@@ -188,6 +188,7 @@ class RaftServerImpl implements RaftServer.Division,
   private final ExecutorService clientExecutor;
 
   private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
+  private final ThreadGroup threadGroup;
 
   RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException {
     final RaftPeerId id = proxy.getId();
@@ -213,6 +214,7 @@ class RaftServerImpl implements RaftServer.Division,
         getMemberId(), () -> commitInfoCache::get, retryCache::getStatistics);
 
     this.startComplete = new AtomicBoolean(false);
+    this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString());
 
     this.raftClient = JavaUtils.memoize(() -> RaftClient.newBuilder()
         .setRaftGroup(group)
@@ -271,6 +273,11 @@ class RaftServerImpl implements RaftServer.Division,
     return sleepDeviationThreshold;
   }
 
+  @Override
+  public ThreadGroup getThreadGroup() {
+    return threadGroup;
+  }
+
   @Override
   public StateMachine getStateMachine() {
     return stateMachine;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index b8cee7f53..bef72ee0d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -193,9 +193,10 @@ class RaftServerProxy implements RaftServer {
   private final ExecutorService executor;
 
   private final JvmPauseMonitor pauseMonitor;
+  private final ThreadGroup threadGroup;
 
   RaftServerProxy(RaftPeerId id, StateMachine.Registry stateMachineRegistry,
-      RaftProperties properties, Parameters parameters) {
+      RaftProperties properties, Parameters parameters, ThreadGroup threadGroup) {
     this.properties = properties;
     this.stateMachineRegistry = stateMachineRegistry;
 
@@ -218,6 +219,7 @@ class RaftServerProxy implements RaftServer {
     final TimeDuration leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
     this.pauseMonitor = new JvmPauseMonitor(id,
         extraSleep -> handleJvmPause(extraSleep, rpcSlownessTimeout, leaderStepDownWaitTime));
+    this.threadGroup = threadGroup == null ? new ThreadGroup(this.id.toString()) : threadGroup;
   }
 
   private void handleJvmPause(TimeDuration extraSleep, TimeDuration closeThreshold, TimeDuration stepDownThreshold)
@@ -376,6 +378,10 @@ class RaftServerProxy implements RaftServer {
     return lifeCycle.getCurrentState();
   }
 
+  ThreadGroup getThreadGroup() {
+    return threadGroup;
+  }
+
   @Override
   public void start() throws IOException {
     lifeCycle.startAndTransition(this::startImpl, IOException.class);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 6777b9093..6a29b4cbb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -46,26 +46,26 @@ public final class ServerImplUtils {
   /** Create a {@link RaftServerProxy}. */
   public static RaftServerProxy newRaftServer(
       RaftPeerId id, RaftGroup group, StateMachine.Registry stateMachineRegistry,
-      RaftProperties properties, Parameters parameters) throws IOException {
+      ThreadGroup threadGroup, RaftProperties properties, Parameters parameters) throws IOException {
     RaftServer.LOG.debug("newRaftServer: {}, {}", id, group);
     if (group != null && !group.getPeers().isEmpty()) {
       Preconditions.assertNotNull(id, "RaftPeerId %s is not in RaftGroup %s", id, group);
       Preconditions.assertNotNull(group.getPeer(id), "RaftPeerId %s is not in RaftGroup %s", id, group);
     }
-    final RaftServerProxy proxy = newRaftServer(id, stateMachineRegistry, properties, parameters);
+    final RaftServerProxy proxy = newRaftServer(id, stateMachineRegistry, threadGroup, properties, parameters);
     proxy.initGroups(group);
     return proxy;
   }
 
   private static RaftServerProxy newRaftServer(
-      RaftPeerId id, StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters)
-      throws IOException {
+      RaftPeerId id, StateMachine.Registry stateMachineRegistry, ThreadGroup threadGroup, RaftProperties properties,
+      Parameters parameters) throws IOException {
     final TimeDuration sleepTime = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
     final RaftServerProxy proxy;
     try {
       // attempt multiple times to avoid temporary bind exception
       proxy = JavaUtils.attemptRepeatedly(
-          () -> new RaftServerProxy(id, stateMachineRegistry, properties, parameters),
+          () -> new RaftServerProxy(id, stateMachineRegistry, properties, parameters, threadGroup),
           5, sleepTime, "new RaftServerProxy", RaftServer.LOG);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 321d1d71a..bd62b65ac 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -109,8 +109,8 @@ class StateMachineUpdater implements Runnable {
       }
     };
     this.purgeUptoSnapshotIndex = RaftServerConfigKeys.Log.purgeUptoSnapshotIndex(properties);
-
-    updater = Daemon.newBuilder().setName(name).setRunnable(this).build();
+    updater = Daemon.newBuilder().setName(name).setRunnable(this)
+        .setThreadGroup(server.getThreadGroup()).build();
     this.awaitForSignal = new AwaitForSignal(name);
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
index d1688987d..d985a6ae8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
@@ -48,7 +48,8 @@ class LogAppenderDaemon {
     this.logAppender = logAppender;
     this.name = logAppender + "-" + JavaUtils.getClassSimpleName(getClass());
     this.lifeCycle = new LifeCycle(name);
-    this.daemon = Daemon.newBuilder().setName(name).setRunnable(this::run).build();
+    this.daemon = Daemon.newBuilder().setName(name).setRunnable(this::run)
+        .setThreadGroup(logAppender.getServer().getThreadGroup()).build();
   }
 
   public boolean isWorking() {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 64638a2e8..ec2c285e5 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -387,7 +387,7 @@ public abstract class MiniRaftCluster implements Closeable {
       }
       final RaftProperties prop = new RaftProperties(properties);
       RaftServerConfigKeys.setStorageDir(prop, Collections.singletonList(dir));
-      return ServerImplUtils.newRaftServer(id, group, getStateMachineRegistry(prop), prop,
+      return ServerImplUtils.newRaftServer(id, group, getStateMachineRegistry(prop), null, prop,
           setPropertiesAndInitParameters(id, group, prop));
     } catch (IOException e) {
       throw new RuntimeException(e);