You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/29 01:05:32 UTC
[incubator-ratis] branch master updated: RATIS-1185. Avoid directly
using RaftServerProxy in JvmPauseMonitor. (#304)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 5ee32b5 RATIS-1185. Avoid directly using RaftServerProxy in JvmPauseMonitor. (#304)
5ee32b5 is described below
commit 5ee32b57ae929cc60f2217bbb09314d594ffb2f8
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Sun Nov 29 09:05:22 2020 +0800
RATIS-1185. Avoid directly using RaftServerProxy in JvmPauseMonitor. (#304)
---
.../main/java/org/apache/ratis/util/JavaUtils.java | 13 --
.../org/apache/ratis/util/JvmPauseMonitor.java | 163 +++++++++++++++++++
.../java/org/apache/ratis/util/TimeDuration.java | 16 +-
.../main/java/org/apache/ratis/util/Timestamp.java | 9 ++
.../org/apache/ratis/server/JvmPauseMonitor.java | 180 ---------------------
.../apache/ratis/server/RaftServerConfigKeys.java | 8 +-
.../apache/ratis/server/impl/FollowerState.java | 16 +-
.../apache/ratis/server/impl/LeaderElection.java | 6 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 13 +-
.../apache/ratis/server/impl/RaftServerProxy.java | 19 ++-
10 files changed, 224 insertions(+), 219 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 1511688..8cf895b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -255,17 +255,4 @@ public interface JavaUtils {
throw new CompletionException(t);
}
}
-
- static boolean sleep(long sleepMs, long thresholdMs) throws InterruptedException {
- final Timestamp t = Timestamp.currentTime();
- Thread.sleep(sleepMs);
- final long elapsedMs = t.elapsedTimeMs();
- if (elapsedMs - sleepMs > thresholdMs) {
- LOG.warn("Unexpected long sleep: sleep({}ms) actually took {}ms which is over the threshold {}ms",
- sleepMs, elapsedMs, thresholdMs);
- return false;
- }
- return true;
- }
-
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
new file mode 100644
index 0000000..cd92297
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.util.function.CheckedConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryManagerMXBean;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+public class JvmPauseMonitor {
+ public static final Logger LOG = LoggerFactory.getLogger(JvmPauseMonitor.class);
+
+ static final class GcInfo {
+ private final long count;
+ private final long timeMs;
+
+ private GcInfo(GarbageCollectorMXBean gcBean) {
+ this(gcBean.getCollectionCount(), gcBean.getCollectionTime());
+ }
+
+ private GcInfo(long count, long timeMs) {
+ this.count = count;
+ this.timeMs = timeMs;
+ }
+
+ GcInfo subtract(GcInfo that) {
+ return new GcInfo(this.count - that.count, this.timeMs - that.timeMs);
+ }
+
+ @Override
+ public String toString() {
+ return "count=" + count + " time=" + timeMs + "ms";
+ }
+ }
+
+ static Map<String, GcInfo> getGcTimes() {
+ return ManagementFactory.getGarbageCollectorMXBeans().stream()
+ .collect(Collectors.toMap(MemoryManagerMXBean::getName, GcInfo::new));
+ }
+
+ static String toString(Map<String, GcInfo> beforeSleep, TimeDuration extraSleepTime, Map<String, GcInfo> afterSleep) {
+ final StringBuilder b = new StringBuilder("Detected pause in JVM or host machine (eg GC): pause of approximately ")
+ .append(extraSleepTime)
+ .append(System.lineSeparator());
+
+ boolean detected = false;
+ for(Map.Entry<String, GcInfo> before: beforeSleep.entrySet()) {
+ final String name = before.getKey();
+ final GcInfo after = afterSleep.get(name);
+ if (after != null) {
+ final GcInfo diff = after.subtract(before.getValue());
+ if (diff.count != 0) {
+ b.append(System.lineSeparator()).append("GC pool '").append(name)
+ .append("' had collection(s): ").append(diff);
+ detected = true;
+ }
+ }
+ }
+
+ if (!detected) {
+ b.append(System.lineSeparator()).append("No GCs detected");
+ }
+ return b.toString();
+ }
+
+ private static final TimeDuration SLEEP_TIME = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
+ private static final TimeDuration WARN_THRESHOLD = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+
+ private final String name;
+ private final AtomicReference<Thread> threadRef = new AtomicReference<>();
+ private final CheckedConsumer<TimeDuration, IOException> handler;
+
+ public JvmPauseMonitor(Object name, CheckedConsumer<TimeDuration, IOException> handler) {
+ this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name;
+ this.handler = handler;
+ }
+
+ private void run() {
+ LOG.info("{}: Started", this);
+ try {
+ for (; Thread.currentThread().equals(threadRef.get()); ) {
+ detectPause();
+ }
+ } finally {
+ LOG.info("{}: Stopped", this);
+ }
+ }
+
+ private void detectPause() {
+ final Map<String, GcInfo> before = getGcTimes();
+ final TimeDuration extraSleep;
+ try {
+ extraSleep = SLEEP_TIME.sleep();
+ } catch (InterruptedException ie) {
+ return;
+ }
+
+ if (extraSleep.compareTo(WARN_THRESHOLD) > 0) {
+ final Map<String, GcInfo> after = getGcTimes();
+ LOG.warn("{}: {}", this, toString(before, extraSleep, after));
+ }
+
+ handle(extraSleep);
+ }
+
+ private void handle(TimeDuration extraSleep) {
+ try {
+ handler.accept(extraSleep);
+ } catch (Throwable t) {
+ LOG.error("{}: Failed to handle extra sleep {}", this, extraSleep, t);
+ }
+ }
+
+ /** Start this monitor. */
+ public void start() {
+ final MemoizedSupplier<Thread> supplier = JavaUtils.memoize(() -> new Daemon(this::run));
+ Optional.of(threadRef.updateAndGet(previous -> Optional.ofNullable(previous).orElseGet(supplier)))
+ .filter(t -> supplier.isInitialized())
+ .ifPresent(Thread::start);
+ }
+
+ /** Stop this monitor. */
+ public void stop() {
+ final Thread previous = threadRef.getAndSet(null);
+ if (previous != null) {
+ previous.interrupt();
+ try {
+ previous.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 1c186ff..54d0dab 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -299,15 +299,22 @@ public final class TimeDuration implements Comparable<TimeDuration> {
return duration <= 0;
}
- /** Performs a {@link TimeUnit#sleep(long)} using this {@link TimeDuration}. */
- public void sleep() throws InterruptedException {
- sleep(null);
+ /** The same as sleep(null). */
+ public TimeDuration sleep() throws InterruptedException {
+ return sleep(null);
}
- public void sleep(Consumer<Object> log) throws InterruptedException {
+ /**
+ * Performs a {@link TimeUnit#sleep(long)} using this {@link TimeDuration}.
+ *
+ * @param log If not null, use it to print log messages.
+ * @return the difference of the actual sleep time duration and this {@link TimeDuration}.
+ */
+ public TimeDuration sleep(Consumer<Object> log) throws InterruptedException {
if (log != null) {
log.accept(StringUtils.stringSupplierAsObject(() -> "Start sleeping " + this));
}
+ final Timestamp start = Timestamp.currentTime();
try {
unit.sleep(duration);
if (log != null) {
@@ -320,6 +327,7 @@ public final class TimeDuration implements Comparable<TimeDuration> {
}
throw ie;
}
+ return start.elapsedTime().subtract(this);
}
@Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java b/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
index 8ce45ae..ba5fb8c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
@@ -67,6 +67,15 @@ public final class Timestamp implements Comparable<Timestamp> {
}
/**
+ * @param t the time period to be added.
+ * @return a new {@link Timestamp} whose value is calculated
+ * by adding the given milliseconds to this timestamp.
+ */
+ public Timestamp addTime(TimeDuration t) {
+ return new Timestamp(nanos + t.to(TimeUnit.NANOSECONDS).getDuration());
+ }
+
+ /**
* @return the elapsed time in milliseconds.
* If the timestamp is a future time, the returned value is negative.
*/
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java b/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java
deleted file mode 100644
index 9470110..0000000
--- a/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.server;
-
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
-import org.apache.ratis.thirdparty.com.google.common.base.Joiner;
-import org.apache.ratis.thirdparty.com.google.common.base.Stopwatch;
-import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
-import org.apache.ratis.thirdparty.com.google.common.collect.Maps;
-import org.apache.ratis.thirdparty.com.google.common.collect.Sets;
-import org.apache.ratis.util.Daemon;
-import org.apache.ratis.util.JavaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-
-public class JvmPauseMonitor {
- public static final Logger LOG =
- LoggerFactory.getLogger(JvmPauseMonitor.class);
- /**
- * The target sleep time
- */
- private static final long SLEEP_INTERVAL_MS = 500;
-
- private Thread monitorThread;
- private volatile boolean shouldRun = true;
- private final RaftServerProxy proxy;
-
- private String formatMessage(long extraSleepTime,
- Map<String, GcTimes> gcTimesAfterSleep,
- Map<String, GcTimes> gcTimesBeforeSleep) {
-
- Set<String> gcBeanNames = Sets.intersection(gcTimesAfterSleep.keySet(),
- gcTimesBeforeSleep.keySet());
- List<String> gcDiffs = Lists.newArrayList();
- for (String name : gcBeanNames) {
- GcTimes diff = gcTimesAfterSleep.get(name)
- .subtract(gcTimesBeforeSleep.get(name));
- if (diff.gcCount != 0) {
- gcDiffs.add("GC pool '" + name + "' had collection(s): " + diff);
- }
- }
-
- String ret = "Detected pause in JVM or host machine (eg GC): "
- + "pause of approximately " + extraSleepTime + "ms\n";
- if (gcDiffs.isEmpty()) {
- ret += "No GCs detected";
- } else {
- ret += Joiner.on("\n").join(gcDiffs);
- }
- return ret;
- }
-
- public JvmPauseMonitor(RaftServerProxy proxy) {
- this.proxy = proxy;
- }
-
- private Map<String, GcTimes> getGcTimes() {
- Map<String, GcTimes> map = Maps.newHashMap();
- List<GarbageCollectorMXBean> gcBeans =
- ManagementFactory.getGarbageCollectorMXBeans();
- for (GarbageCollectorMXBean gcBean : gcBeans) {
- map.put(gcBean.getName(), new GcTimes(gcBean));
- }
- return map;
- }
-
- private static final class GcTimes {
- private GcTimes(GarbageCollectorMXBean gcBean) {
- gcCount = gcBean.getCollectionCount();
- gcTimeMillis = gcBean.getCollectionTime();
- }
-
- private GcTimes(long count, long time) {
- this.gcCount = count;
- this.gcTimeMillis = time;
- }
-
- private GcTimes subtract(GcTimes other) {
- return new GcTimes(this.gcCount - other.gcCount,
- this.gcTimeMillis - other.gcTimeMillis);
- }
-
- @Override
- public String toString() {
- return "count=" + gcCount + " time=" + gcTimeMillis + "ms";
- }
-
- private long gcCount;
- private long gcTimeMillis;
- }
-
- class Monitor implements Runnable {
- private final String name = JvmPauseMonitor.this + "-" + JavaUtils
- .getClassSimpleName(getClass());
-
- public void run() {
- int leaderStepDownWaitTime =
- RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(proxy.getProperties()).toIntExact(TimeUnit
- .MILLISECONDS);
- int rpcSlownessTimeoutMs = RaftServerConfigKeys.Rpc.slownessTimeout(proxy.getProperties()).toIntExact(
- TimeUnit.MILLISECONDS);
- Stopwatch sw = Stopwatch.createUnstarted();
- Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes();
- LOG.info("Starting Ratis JVM pause monitor");
- while (shouldRun) {
- sw.reset().start();
- try {
- Thread.sleep(SLEEP_INTERVAL_MS);
- } catch (InterruptedException ie) {
- return;
- }
- long extraSleepTime = sw.elapsed(TimeUnit.MILLISECONDS) - SLEEP_INTERVAL_MS;
- Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
-
- if (extraSleepTime > 100) {
- LOG.warn(formatMessage(extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
- }
- try {
- if (extraSleepTime > rpcSlownessTimeoutMs) {
- // close down all pipelines if the total gc period exceeds
- // rpc slowness timeout
- proxy.close();
- } else if (extraSleepTime > leaderStepDownWaitTime) {
- proxy.getImpls().forEach(RaftServerImpl::stepDownOnJvmPause);
- }
- } catch (IOException ioe) {
- LOG.info("Encountered exception in JvmPauseMonitor {}", ioe);
- }
- }
- }
-
- @Override
- public String toString() {
- return name;
- }
- }
-
-
- public void start() {
- monitorThread = new Daemon(new Monitor());
- monitorThread.start();
- }
-
- public void stop() {
- shouldRun = false;
- if (monitorThread != null) {
- monitorThread.interrupt();
- try {
- monitorThread.join();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index e652408..9543380 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -62,10 +62,10 @@ public interface RaftServerConfigKeys {
}
String SLEEP_DEVIATION_THRESHOLD_KEY = PREFIX + ".sleep.deviation.threshold";
- int SLEEP_DEVIATION_THRESHOLD_DEFAULT = 300;
- static int sleepDeviationThreshold(RaftProperties properties) {
- return getInt(properties::getInt, SLEEP_DEVIATION_THRESHOLD_KEY,
- SLEEP_DEVIATION_THRESHOLD_DEFAULT, getDefaultLog());
+ TimeDuration SLEEP_DEVIATION_THRESHOLD_DEFAULT = TimeDuration.valueOf(300, TimeUnit.MILLISECONDS);
+ static TimeDuration sleepDeviationThreshold(RaftProperties properties) {
+ return getTimeDuration(properties.getTimeDuration(SLEEP_DEVIATION_THRESHOLD_DEFAULT.getUnit()),
+ SLEEP_DEVIATION_THRESHOLD_KEY, SLEEP_DEVIATION_THRESHOLD_DEFAULT, getDefaultLog());
}
static void setSleepDeviationThreshold(RaftProperties properties, int thresholdMs) {
setInt(properties::setInt, SLEEP_DEVIATION_THRESHOLD_KEY, thresholdMs);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index 7c08a0e..b3173bb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -109,11 +109,14 @@ class FollowerState extends Daemon {
@Override
public void run() {
- long sleepDeviationThresholdMs = server.getSleepDeviationThresholdMs();
+ final TimeDuration sleepDeviationThreshold = server.getSleepDeviationThreshold();
while (isRunning && server.isFollower()) {
- final long electionTimeout = server.getRandomTimeoutMs();
+ final TimeDuration electionTimeout = server.getRandomElectionTimeout();
try {
- if (!JavaUtils.sleep(electionTimeout, sleepDeviationThresholdMs)) {
+ final TimeDuration extraSleep = electionTimeout.sleep();
+ if (extraSleep.compareTo(sleepDeviationThreshold) > 0) {
+ LOG.warn("Unexpected long sleep: sleep {} but took extra {} (> threshold = {})",
+ electionTimeout, extraSleep, sleepDeviationThreshold);
continue;
}
@@ -123,10 +126,11 @@ class FollowerState extends Daemon {
break;
}
synchronized (server) {
- if (outstandingOp.get() == 0 && lastRpcTime.elapsedTimeMs() >= electionTimeout
+ if (outstandingOp.get() == 0
+ && lastRpcTime.elapsedTime().compareTo(electionTimeout) >= 0
&& !lostMajorityHeartbeatsRecently()) {
- LOG.info("{}: change to CANDIDATE, lastRpcTime:{}ms, electionTimeout:{}ms",
- this, lastRpcTime.elapsedTimeMs(), electionTimeout);
+ LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}, electionTimeout:{}",
+ this, lastRpcTime.elapsedTime(), electionTimeout);
server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters.
// election timeout, should become a candidate
server.changeToCandidate();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index dbbefa8..1a31d66 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -285,7 +285,7 @@ class LeaderElection implements Runnable {
private ResultAndTerm waitForResults(final long electionTerm, final int submitted,
RaftConfiguration conf, Executor voteExecutor) throws InterruptedException {
- final Timestamp timeout = Timestamp.currentTime().addTimeMs(server.getRandomTimeoutMs());
+ final Timestamp timeout = Timestamp.currentTime().addTime(server.getRandomElectionTimeout());
final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
final List<Exception> exceptions = new ArrayList<>();
int waitForNum = submitted;
@@ -336,9 +336,7 @@ class LeaderElection implements Runnable {
// remove higher priority peer, so that we check higherPriorityPeers empty to make sure
// all higher priority peers have replied
- if (higherPriorityPeers.contains(replierId)) {
- higherPriorityPeers.remove(replierId);
- }
+ higherPriorityPeers.remove(replierId);
if (r.getServerReply().getSuccess()) {
votedPeers.add(replierId);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index a1dcc36..1393b0e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -91,7 +91,7 @@ public class RaftServerImpl implements RaftServer.Division,
private final int maxTimeoutMs;
private final TimeDuration leaderStepDownWaitTime;
private final int rpcSlownessTimeoutMs;
- private final int sleepDeviationThresholdMs;
+ private final TimeDuration sleepDeviationThreshold;
private final boolean installSnapshotEnabled;
private final LifeCycle lifeCycle;
@@ -128,7 +128,7 @@ public class RaftServerImpl implements RaftServer.Division,
maxTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMax(properties).toIntExact(TimeUnit.MILLISECONDS);
rpcSlownessTimeoutMs = RaftServerConfigKeys.Rpc.slownessTimeout(properties).toIntExact(TimeUnit.MILLISECONDS);
leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
- sleepDeviationThresholdMs = RaftServerConfigKeys.sleepDeviationThreshold(properties);
+ this.sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties);
installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs,
"max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
@@ -168,16 +168,17 @@ public class RaftServerImpl implements RaftServer.Division,
return rpcSlownessTimeoutMs;
}
- int getRandomTimeoutMs() {
- return minTimeoutMs + ThreadLocalRandom.current().nextInt(maxTimeoutMs - minTimeoutMs + 1);
+ TimeDuration getRandomElectionTimeout() {
+ final long millis = minTimeoutMs + ThreadLocalRandom.current().nextInt(maxTimeoutMs - minTimeoutMs + 1);
+ return TimeDuration.valueOf(millis, TimeUnit.MILLISECONDS);
}
TimeDuration getLeaderStepDownWaitTime() {
return leaderStepDownWaitTime;
}
- int getSleepDeviationThresholdMs() {
- return sleepDeviationThresholdMs;
+ TimeDuration getSleepDeviationThreshold() {
+ return sleepDeviationThreshold;
}
@Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index f1dbe2f..b18b07b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -35,7 +35,7 @@ import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.JvmPauseMonitor;
+import org.apache.ratis.util.JvmPauseMonitor;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.statemachine.StateMachine;
@@ -44,6 +44,7 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedFunction;
import java.io.Closeable;
@@ -168,6 +169,7 @@ public class RaftServerProxy implements RaftServer {
private final ImplMap impls = new ImplMap();
private final ExecutorService implExecutor = Executors.newSingleThreadExecutor();
+
private final JvmPauseMonitor pauseMonitor;
RaftServerProxy(RaftPeerId id, StateMachine.Registry stateMachineRegistry,
@@ -184,7 +186,20 @@ public class RaftServerProxy implements RaftServer {
this.lifeCycle = new LifeCycle(this.id + "-" + JavaUtils.getClassSimpleName(getClass()));
this.dataStreamServerRpc = new DataStreamServerImpl(this, parameters).getServerRpc();
- this.pauseMonitor = new JvmPauseMonitor(this);
+
+ final TimeDuration rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties);
+ final TimeDuration leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
+ this.pauseMonitor = new JvmPauseMonitor(id,
+ extraSleep -> handleJvmPause(extraSleep, rpcSlownessTimeout, leaderStepDownWaitTime));
+ }
+
+ private void handleJvmPause(TimeDuration extraSleep, TimeDuration closeThreshold, TimeDuration stepDownThreshold)
+ throws IOException {
+ if (extraSleep.compareTo(closeThreshold) > 0) {
+ close();
+ } else if (extraSleep.compareTo(stepDownThreshold) > 0) {
+ getImpls().forEach(RaftServerImpl::stepDownOnJvmPause);
+ }
}
/** Check the storage dir and add groups*/