You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/09/14 11:30:50 UTC

[ratis] branch master updated: RATIS-1695. Use a Builder for Daemon (#747)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new a04bf69ca RATIS-1695. Use a Builder for Daemon (#747)
a04bf69ca is described below

commit a04bf69cacec68e81c39a4f5db92e9eb73dd0cb7
Author: Jiacheng Liu <ji...@gmail.com>
AuthorDate: Wed Sep 14 19:30:44 2022 +0800

    RATIS-1695. Use a Builder for Daemon (#747)
---
 .../main/java/org/apache/ratis/util/Daemon.java    | 37 ++++++++++++++++------
 .../org/apache/ratis/util/JvmPauseMonitor.java     |  5 ++-
 .../org/apache/ratis/util/TimeoutScheduler.java    |  6 ++--
 .../apache/ratis/server/impl/FollowerState.java    |  6 ++--
 .../apache/ratis/server/impl/LeaderElection.java   |  5 +--
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  2 +-
 .../ratis/server/impl/StateMachineUpdater.java     |  2 +-
 .../ratis/server/leader/LogAppenderDaemon.java     |  2 +-
 .../apache/ratis/server/impl/MiniRaftCluster.java  |  5 ++-
 .../ratis/server/simulation/RequestHandler.java    |  1 +
 .../server/simulation/SimulatedServerRpc.java      |  5 ++-
 .../statemachine/SimpleStateMachine4Testing.java   |  4 +--
 12 files changed, 54 insertions(+), 26 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
index b31185474..8e576a0ac 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
@@ -17,24 +17,41 @@
  */
 package org.apache.ratis.util;
 
+import java.util.Objects;
+
 public class Daemon extends Thread {
   {
     setDaemon(true);
   }
 
-  /** Construct a daemon thread. */
-  public Daemon() {
-    super();
+  /** Construct a daemon thread with flexible arguments. */
+  protected Daemon(Builder builder) {
+    super(builder.runnable);
+    setName(builder.name);
   }
 
-  /** Construct a daemon thread with the given runnable. */
-  public Daemon(Runnable runnable) {
-    this(runnable, runnable.toString());
+  /** @return a {@link Builder}. */
+  public static Builder newBuilder() {
+    return new Builder();
   }
 
-  /** Construct a daemon thread with the given runnable. */
-  public Daemon(Runnable runnable, String name) {
-    super(runnable);
-    this.setName(name);
+  public static class Builder {
+    private String name;
+    private Runnable runnable;
+
+    public Builder setName(String name) {
+      this.name = name;
+      return this;
+    }
+
+    public Builder setRunnable(Runnable runnable) {
+      this.runnable = runnable;
+      return this;
+    }
+
+    public Daemon build() {
+      Objects.requireNonNull(name, "name == null");
+      return new Daemon(this);
+    }
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
index aa89bde0c..1fcfc4d6a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
@@ -28,11 +28,13 @@ import java.lang.management.MemoryManagerMXBean;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 public class JvmPauseMonitor {
   public static final Logger LOG = LoggerFactory.getLogger(JvmPauseMonitor.class);
+  private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
 
   static final class GcInfo {
     private final long count;
@@ -137,7 +139,8 @@ public class JvmPauseMonitor {
 
   /** Start this monitor. */
   public void start() {
-    final MemoizedSupplier<Thread> supplier = JavaUtils.memoize(() -> new Daemon(this::run));
+    final MemoizedSupplier<Thread> supplier = JavaUtils.memoize(() -> Daemon.newBuilder()
+        .setName("JvmPauseMonitor" + THREAD_COUNT.getAndIncrement()).setRunnable(this::run).build());
     Optional.of(threadRef.updateAndGet(previous -> Optional.ofNullable(previous).orElseGet(supplier)))
         .filter(t -> supplier.isInitialized())
         .ifPresent(Thread::start);
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
index cba2851f4..d6be6c0ec 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
@@ -25,8 +25,8 @@ import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -37,6 +37,7 @@ public final class TimeoutScheduler implements TimeoutExecutor {
   static final TimeDuration DEFAULT_GRACE_PERIOD = TimeDuration.valueOf(1, TimeUnit.MINUTES);
 
   private static final Supplier<TimeoutScheduler> INSTANCE = JavaUtils.memoize(TimeoutScheduler::new);
+  private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
 
   public static TimeoutScheduler getInstance() {
     return INSTANCE.get();
@@ -84,7 +85,8 @@ public final class TimeoutScheduler implements TimeoutExecutor {
 
     private static ScheduledThreadPoolExecutor newExecutor() {
       LOG.debug("new ScheduledThreadPoolExecutor");
-      final ScheduledThreadPoolExecutor e = new ScheduledThreadPoolExecutor(1, (ThreadFactory) Daemon::new);
+      final ScheduledThreadPoolExecutor e = new ScheduledThreadPoolExecutor(1, (runnable) -> Daemon.newBuilder()
+          .setName("TimeoutScheduler-" + THREAD_COUNT.getAndIncrement()).setRunnable(runnable).build());
       e.setRemoveOnCancelPolicy(true);
       return e;
     }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index 52ae033f5..afad7c559 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -55,7 +55,6 @@ class FollowerState extends Daemon {
 
   static final Logger LOG = LoggerFactory.getLogger(FollowerState.class);
 
-  private final String name;
   private final Object reason;
   private final RaftServerImpl server;
 
@@ -65,8 +64,7 @@ class FollowerState extends Daemon {
   private final AtomicInteger outstandingOp = new AtomicInteger();
 
   FollowerState(RaftServerImpl server, Object reason) {
-    this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass());
-    this.setName(this.name);
+    super(newBuilder().setName(server.getMemberId() + "-" + JavaUtils.getClassSimpleName(FollowerState.class)));
     this.server = server;
     this.reason = reason;
   }
@@ -161,6 +159,6 @@ class FollowerState extends Daemon {
 
   @Override
   public String toString() {
-    return name;
+    return getName();
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index f89e8b502..14c2b6713 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -137,7 +137,8 @@ class LeaderElection implements Runnable {
 
     Executor(Object name, int size) {
       Preconditions.assertTrue(size > 0);
-      executor = Executors.newFixedThreadPool(size, r -> new Daemon(r, name + "-" + count.incrementAndGet()));
+      executor = Executors.newFixedThreadPool(size, r ->
+          Daemon.newBuilder().setName(name + "-" + count.incrementAndGet()).setRunnable(r).build());
       service = new ExecutorCompletionService<>(executor);
     }
 
@@ -189,7 +190,7 @@ class LeaderElection implements Runnable {
   LeaderElection(RaftServerImpl server, boolean skipPreVote) {
     this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()) + COUNT.incrementAndGet();
     this.lifeCycle = new LifeCycle(this);
-    this.daemon = new Daemon(this);
+    this.daemon = Daemon.newBuilder().setName(name).setRunnable(this).build();
     this.server = server;
     this.skipPreVote = skipPreVote ||
         !RaftServerConfigKeys.LeaderElection.preVote(
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 65c586530..d51a3bcfc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -621,7 +621,7 @@ class LeaderStateImpl implements LeaderState {
    */
   private class EventProcessor extends Daemon {
     public EventProcessor(String name) {
-      setName(name);
+      super(Daemon.newBuilder().setName(name));
     }
     @Override
     public void run() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 983b49f7b..f3081a267 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -114,7 +114,7 @@ class StateMachineUpdater implements Runnable {
     };
     this.purgeUptoSnapshotIndex = RaftServerConfigKeys.Log.purgeUptoSnapshotIndex(properties);
 
-    updater = new Daemon(this);
+    updater = Daemon.newBuilder().setName(name).setRunnable(this).build();
     this.awaitForSignal = new AwaitForSignal(name);
     this.stateMachineMetrics = MemoizedSupplier.valueOf(
         () -> StateMachineMetrics.getStateMachineMetrics(server, appliedIndex, stateMachine));
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
index 6b2d60796..d1688987d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
@@ -48,7 +48,7 @@ class LogAppenderDaemon {
     this.logAppender = logAppender;
     this.name = logAppender + "-" + JavaUtils.getClassSimpleName(getClass());
     this.lifeCycle = new LifeCycle(name);
-    this.daemon = new Daemon(this::run, name);
+    this.daemon = Daemon.newBuilder().setName(name).setRunnable(this::run).build();
   }
 
   public boolean isWorking() {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index dfed96952..098bc4e62 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -74,6 +74,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -90,6 +91,7 @@ public abstract class MiniRaftCluster implements Closeable {
   private static final StateMachine.Registry STATEMACHINE_REGISTRY_DEFAULT = gid -> new BaseStateMachine();
   private static final TimeDuration RETRY_INTERVAL_DEFAULT =
       TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+  static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
 
   public static abstract class Factory<CLUSTER extends MiniRaftCluster> {
     public interface Get<CLUSTER extends MiniRaftCluster> {
@@ -836,7 +838,8 @@ public abstract class MiniRaftCluster implements Closeable {
     // TODO: classes like RaftLog may throw uncaught exception during shutdown (e.g. write after close)
     ExitUtils.setTerminateOnUncaughtException(false);
 
-    final ExecutorService executor = Executors.newFixedThreadPool(servers.size(), Daemon::new);
+    final ExecutorService executor = Executors.newFixedThreadPool(servers.size(), (t) ->
+        Daemon.newBuilder().setName("MiniRaftCluster-" + THREAD_COUNT.incrementAndGet()).setRunnable(t).build());
     getServers().forEach(proxy -> executor.submit(() -> JavaUtils.runAsUnchecked(proxy::close)));
     try {
       executor.shutdown();
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
index 10382f0a4..6a5c9c881 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
@@ -106,6 +106,7 @@ public class RequestHandler<REQUEST extends RaftRpcMessage,
     private final int id;
 
     HandlerDaemon(int id) {
+      super(newBuilder().setName("HandlerDaemon-" + id));
       this.id = id;
     }
 
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index 91905d599..863432d9c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -51,15 +51,18 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
 class SimulatedServerRpc implements RaftServerRpc {
   static final Logger LOG = LoggerFactory.getLogger(SimulatedServerRpc.class);
+  static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
 
   private final RaftServer server;
   private final RequestHandler<RaftServerRequest, RaftServerReply> serverHandler;
   private final RequestHandler<RaftClientRequest, RaftClientReply> clientHandler;
-  private final ExecutorService executor = Executors.newFixedThreadPool(3, Daemon::new);
+  private final ExecutorService executor = Executors.newFixedThreadPool(3, (t) ->
+      Daemon.newBuilder().setName("SimulatedServerRpc-" + THREAD_COUNT.incrementAndGet()).setRunnable(t).build());
 
   SimulatedServerRpc(RaftServer server,
       SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply,
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 2b4d2b872..cf715585e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -169,7 +169,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
   private RaftGroupId groupId;
 
   public SimpleStateMachine4Testing() {
-    checkpointer = new Daemon(() -> {
+    checkpointer = Daemon.newBuilder().setName("SimpleStateMachine4Testing").setRunnable(() -> {
       while (running) {
         if (indexMap.lastKey() - endIndexLastCkpt >= SNAPSHOT_THRESHOLD) {
           endIndexLastCkpt = takeSnapshot();
@@ -181,7 +181,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
           Thread.currentThread().interrupt();
         }
       }
-    });
+    }).build();
   }
 
   public Collecting collecting() {