You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/09/30 06:45:19 UTC
[ratis] 02/03: RATIS-1695. Use a Builder for Daemon (#747)
This is an automated email from the ASF dual-hosted git repository.
dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git
commit c596fd3383f9a74645b3467a462519488b842156
Author: Jiacheng Liu <ji...@gmail.com>
AuthorDate: Wed Sep 14 19:30:44 2022 +0800
RATIS-1695. Use a Builder for Daemon (#747)
(cherry picked from commit a04bf69cacec68e81c39a4f5db92e9eb73dd0cb7)
---
.../main/java/org/apache/ratis/util/Daemon.java | 37 ++++++++++++++++------
.../org/apache/ratis/util/JvmPauseMonitor.java | 5 ++-
.../org/apache/ratis/util/TimeoutScheduler.java | 6 ++--
.../apache/ratis/server/impl/FollowerState.java | 6 ++--
.../apache/ratis/server/impl/LeaderElection.java | 5 +--
.../apache/ratis/server/impl/LeaderStateImpl.java | 2 +-
.../ratis/server/impl/StateMachineUpdater.java | 2 +-
.../ratis/server/leader/LogAppenderDaemon.java | 2 +-
.../apache/ratis/server/impl/MiniRaftCluster.java | 5 ++-
.../ratis/server/simulation/RequestHandler.java | 1 +
.../server/simulation/SimulatedServerRpc.java | 5 ++-
.../statemachine/SimpleStateMachine4Testing.java | 4 +--
12 files changed, 54 insertions(+), 26 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
index b31185474..8e576a0ac 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
@@ -17,24 +17,41 @@
*/
package org.apache.ratis.util;
+import java.util.Objects;
+
public class Daemon extends Thread {
{
setDaemon(true);
}
- /** Construct a daemon thread. */
- public Daemon() {
- super();
+ /** Construct a daemon thread with flexible arguments. */
+ protected Daemon(Builder builder) {
+ super(builder.runnable);
+ setName(builder.name);
}
- /** Construct a daemon thread with the given runnable. */
- public Daemon(Runnable runnable) {
- this(runnable, runnable.toString());
+ /** @return a {@link Builder}. */
+ public static Builder newBuilder() {
+ return new Builder();
}
- /** Construct a daemon thread with the given runnable. */
- public Daemon(Runnable runnable, String name) {
- super(runnable);
- this.setName(name);
+ public static class Builder {
+ private String name;
+ private Runnable runnable;
+
+ public Builder setName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public Builder setRunnable(Runnable runnable) {
+ this.runnable = runnable;
+ return this;
+ }
+
+ public Daemon build() {
+ Objects.requireNonNull(name, "name == null");
+ return new Daemon(this);
+ }
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
index aa89bde0c..1fcfc4d6a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
@@ -28,11 +28,13 @@ import java.lang.management.MemoryManagerMXBean;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public class JvmPauseMonitor {
public static final Logger LOG = LoggerFactory.getLogger(JvmPauseMonitor.class);
+ private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
static final class GcInfo {
private final long count;
@@ -137,7 +139,8 @@ public class JvmPauseMonitor {
/** Start this monitor. */
public void start() {
- final MemoizedSupplier<Thread> supplier = JavaUtils.memoize(() -> new Daemon(this::run));
+ final MemoizedSupplier<Thread> supplier = JavaUtils.memoize(() -> Daemon.newBuilder()
+ .setName("JvmPauseMonitor" + THREAD_COUNT.getAndIncrement()).setRunnable(this::run).build());
Optional.of(threadRef.updateAndGet(previous -> Optional.ofNullable(previous).orElseGet(supplier)))
.filter(t -> supplier.isInitialized())
.ifPresent(Thread::start);
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
index cba2851f4..d6be6c0ec 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
@@ -25,8 +25,8 @@ import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -37,6 +37,7 @@ public final class TimeoutScheduler implements TimeoutExecutor {
static final TimeDuration DEFAULT_GRACE_PERIOD = TimeDuration.valueOf(1, TimeUnit.MINUTES);
private static final Supplier<TimeoutScheduler> INSTANCE = JavaUtils.memoize(TimeoutScheduler::new);
+ private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
public static TimeoutScheduler getInstance() {
return INSTANCE.get();
@@ -84,7 +85,8 @@ public final class TimeoutScheduler implements TimeoutExecutor {
private static ScheduledThreadPoolExecutor newExecutor() {
LOG.debug("new ScheduledThreadPoolExecutor");
- final ScheduledThreadPoolExecutor e = new ScheduledThreadPoolExecutor(1, (ThreadFactory) Daemon::new);
+ final ScheduledThreadPoolExecutor e = new ScheduledThreadPoolExecutor(1, (runnable) -> Daemon.newBuilder()
+ .setName("TimeoutScheduler-" + THREAD_COUNT.getAndIncrement()).setRunnable(runnable).build());
e.setRemoveOnCancelPolicy(true);
return e;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index 52ae033f5..afad7c559 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -55,7 +55,6 @@ class FollowerState extends Daemon {
static final Logger LOG = LoggerFactory.getLogger(FollowerState.class);
- private final String name;
private final Object reason;
private final RaftServerImpl server;
@@ -65,8 +64,7 @@ class FollowerState extends Daemon {
private final AtomicInteger outstandingOp = new AtomicInteger();
FollowerState(RaftServerImpl server, Object reason) {
- this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass());
- this.setName(this.name);
+ super(newBuilder().setName(server.getMemberId() + "-" + JavaUtils.getClassSimpleName(FollowerState.class)));
this.server = server;
this.reason = reason;
}
@@ -161,6 +159,6 @@ class FollowerState extends Daemon {
@Override
public String toString() {
- return name;
+ return getName();
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 5ed18975e..cc7f623e3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -138,7 +138,8 @@ class LeaderElection implements Runnable {
Executor(Object name, int size) {
Preconditions.assertTrue(size > 0);
- executor = Executors.newFixedThreadPool(size, r -> new Daemon(r, name + "-" + count.incrementAndGet()));
+ executor = Executors.newFixedThreadPool(size, r ->
+ Daemon.newBuilder().setName(name + "-" + count.incrementAndGet()).setRunnable(r).build());
service = new ExecutorCompletionService<>(executor);
}
@@ -190,7 +191,7 @@ class LeaderElection implements Runnable {
LeaderElection(RaftServerImpl server, boolean skipPreVote) {
this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()) + COUNT.incrementAndGet();
this.lifeCycle = new LifeCycle(this);
- this.daemon = new Daemon(this);
+ this.daemon = Daemon.newBuilder().setName(name).setRunnable(this).build();
this.server = server;
this.skipPreVote = skipPreVote ||
!RaftServerConfigKeys.LeaderElection.preVote(
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index fbcbce448..fe36a1a6a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -618,7 +618,7 @@ class LeaderStateImpl implements LeaderState {
*/
private class EventProcessor extends Daemon {
public EventProcessor(String name) {
- setName(name);
+ super(Daemon.newBuilder().setName(name));
}
@Override
public void run() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 87aac06b2..321d1d71a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -110,7 +110,7 @@ class StateMachineUpdater implements Runnable {
};
this.purgeUptoSnapshotIndex = RaftServerConfigKeys.Log.purgeUptoSnapshotIndex(properties);
- updater = new Daemon(this);
+ updater = Daemon.newBuilder().setName(name).setRunnable(this).build();
this.awaitForSignal = new AwaitForSignal(name);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
index 6b2d60796..d1688987d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
@@ -48,7 +48,7 @@ class LogAppenderDaemon {
this.logAppender = logAppender;
this.name = logAppender + "-" + JavaUtils.getClassSimpleName(getClass());
this.lifeCycle = new LifeCycle(name);
- this.daemon = new Daemon(this::run, name);
+ this.daemon = Daemon.newBuilder().setName(name).setRunnable(this::run).build();
}
public boolean isWorking() {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 1f4047524..64638a2e8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -73,6 +73,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -89,6 +90,7 @@ public abstract class MiniRaftCluster implements Closeable {
private static final StateMachine.Registry STATEMACHINE_REGISTRY_DEFAULT = gid -> new BaseStateMachine();
private static final TimeDuration RETRY_INTERVAL_DEFAULT =
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+ static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
public static abstract class Factory<CLUSTER extends MiniRaftCluster> {
public interface Get<CLUSTER extends MiniRaftCluster> {
@@ -834,7 +836,8 @@ public abstract class MiniRaftCluster implements Closeable {
// TODO: classes like RaftLog may throw uncaught exception during shutdown (e.g. write after close)
ExitUtils.setTerminateOnUncaughtException(false);
- final ExecutorService executor = Executors.newFixedThreadPool(servers.size(), Daemon::new);
+ final ExecutorService executor = Executors.newFixedThreadPool(servers.size(), (t) ->
+ Daemon.newBuilder().setName("MiniRaftCluster-" + THREAD_COUNT.incrementAndGet()).setRunnable(t).build());
getServers().forEach(proxy -> executor.submit(() -> JavaUtils.runAsUnchecked(proxy::close)));
try {
executor.shutdown();
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
index 10382f0a4..6a5c9c881 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
@@ -106,6 +106,7 @@ public class RequestHandler<REQUEST extends RaftRpcMessage,
private final int id;
HandlerDaemon(int id) {
+ super(newBuilder().setName("HandlerDaemon-" + id));
this.id = id;
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index 91905d599..863432d9c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -51,15 +51,18 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
class SimulatedServerRpc implements RaftServerRpc {
static final Logger LOG = LoggerFactory.getLogger(SimulatedServerRpc.class);
+ static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
private final RaftServer server;
private final RequestHandler<RaftServerRequest, RaftServerReply> serverHandler;
private final RequestHandler<RaftClientRequest, RaftClientReply> clientHandler;
- private final ExecutorService executor = Executors.newFixedThreadPool(3, Daemon::new);
+ private final ExecutorService executor = Executors.newFixedThreadPool(3, (t) ->
+ Daemon.newBuilder().setName("SimulatedServerRpc-" + THREAD_COUNT.incrementAndGet()).setRunnable(t).build());
SimulatedServerRpc(RaftServer server,
SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply,
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 7b5527090..c8755abab 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -169,7 +169,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
private RaftGroupId groupId;
public SimpleStateMachine4Testing() {
- checkpointer = new Daemon(() -> {
+ checkpointer = Daemon.newBuilder().setName("SimpleStateMachine4Testing").setRunnable(() -> {
while (running) {
if (indexMap.lastKey() - endIndexLastCkpt >= SNAPSHOT_THRESHOLD) {
endIndexLastCkpt = takeSnapshot();
@@ -181,7 +181,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
Thread.currentThread().interrupt();
}
}
- });
+ }).build();
}
public Collecting collecting() {