You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/29 01:05:32 UTC

[incubator-ratis] branch master updated: RATIS-1185. Avoid directly using RaftServerProxy in JvmPauseMonitor. (#304)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ee32b5  RATIS-1185. Avoid directly using RaftServerProxy in JvmPauseMonitor. (#304)
5ee32b5 is described below

commit 5ee32b57ae929cc60f2217bbb09314d594ffb2f8
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Sun Nov 29 09:05:22 2020 +0800

    RATIS-1185. Avoid directly using RaftServerProxy in JvmPauseMonitor. (#304)
---
 .../main/java/org/apache/ratis/util/JavaUtils.java |  13 --
 .../org/apache/ratis/util/JvmPauseMonitor.java     | 163 +++++++++++++++++++
 .../java/org/apache/ratis/util/TimeDuration.java   |  16 +-
 .../main/java/org/apache/ratis/util/Timestamp.java |   9 ++
 .../org/apache/ratis/server/JvmPauseMonitor.java   | 180 ---------------------
 .../apache/ratis/server/RaftServerConfigKeys.java  |   8 +-
 .../apache/ratis/server/impl/FollowerState.java    |  16 +-
 .../apache/ratis/server/impl/LeaderElection.java   |   6 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |  13 +-
 .../apache/ratis/server/impl/RaftServerProxy.java  |  19 ++-
 10 files changed, 224 insertions(+), 219 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 1511688..8cf895b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -255,17 +255,4 @@ public interface JavaUtils {
       throw new CompletionException(t);
     }
   }
-
-  static boolean sleep(long sleepMs, long thresholdMs) throws InterruptedException {
-    final Timestamp t = Timestamp.currentTime();
-    Thread.sleep(sleepMs);
-    final long elapsedMs = t.elapsedTimeMs();
-    if (elapsedMs - sleepMs > thresholdMs) {
-      LOG.warn("Unexpected long sleep: sleep({}ms) actually took {}ms which is over the threshold {}ms",
-          sleepMs, elapsedMs, thresholdMs);
-      return false;
-    }
-    return true;
-  }
-
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
new file mode 100644
index 0000000..cd92297
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.util.function.CheckedConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryManagerMXBean;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+public class JvmPauseMonitor {
+  public static final Logger LOG = LoggerFactory.getLogger(JvmPauseMonitor.class);
+
+  static final class GcInfo {
+    private final long count;
+    private final long timeMs;
+
+    private GcInfo(GarbageCollectorMXBean gcBean) {
+      this(gcBean.getCollectionCount(), gcBean.getCollectionTime());
+    }
+
+    private GcInfo(long count, long timeMs) {
+      this.count = count;
+      this.timeMs = timeMs;
+    }
+
+    GcInfo subtract(GcInfo that) {
+      return new GcInfo(this.count - that.count, this.timeMs - that.timeMs);
+    }
+
+    @Override
+    public String toString() {
+      return "count=" + count + " time=" + timeMs + "ms";
+    }
+  }
+
+  static Map<String, GcInfo> getGcTimes() {
+    return ManagementFactory.getGarbageCollectorMXBeans().stream()
+        .collect(Collectors.toMap(MemoryManagerMXBean::getName, GcInfo::new));
+  }
+
+  static String toString(Map<String, GcInfo> beforeSleep, TimeDuration extraSleepTime, Map<String, GcInfo> afterSleep) {
+    final StringBuilder b = new StringBuilder("Detected pause in JVM or host machine (eg GC): pause of approximately ")
+        .append(extraSleepTime)
+        .append(System.lineSeparator());
+
+    boolean detected = false;
+    for(Map.Entry<String, GcInfo> before: beforeSleep.entrySet()) {
+      final String name = before.getKey();
+      final GcInfo after = afterSleep.get(name);
+      if (after != null) {
+        final GcInfo diff = after.subtract(before.getValue());
+        if (diff.count != 0) {
+          b.append(System.lineSeparator()).append("GC pool '").append(name)
+              .append("' had collection(s): ").append(diff);
+          detected = true;
+        }
+      }
+    }
+
+    if (!detected) {
+      b.append(System.lineSeparator()).append("No GCs detected");
+    }
+    return b.toString();
+  }
+
+  private static final TimeDuration SLEEP_TIME = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
+  private static final TimeDuration WARN_THRESHOLD = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+
+  private final String name;
+  private final AtomicReference<Thread> threadRef = new AtomicReference<>();
+  private final CheckedConsumer<TimeDuration, IOException> handler;
+
+  public JvmPauseMonitor(Object name, CheckedConsumer<TimeDuration, IOException> handler) {
+    this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name;
+    this.handler = handler;
+  }
+
+  private void run() {
+    LOG.info("{}: Started", this);
+    try {
+      for (; Thread.currentThread().equals(threadRef.get()); ) {
+        detectPause();
+      }
+    } finally {
+      LOG.info("{}: Stopped", this);
+    }
+  }
+
+  private void detectPause() {
+    final Map<String, GcInfo> before = getGcTimes();
+    final TimeDuration extraSleep;
+    try {
+      extraSleep = SLEEP_TIME.sleep();
+    } catch (InterruptedException ie) {
+      return;
+    }
+
+    if (extraSleep.compareTo(WARN_THRESHOLD) > 0) {
+      final Map<String, GcInfo> after = getGcTimes();
+      LOG.warn("{}: {}", this, toString(before, extraSleep, after));
+    }
+
+    handle(extraSleep);
+  }
+
+  private void handle(TimeDuration extraSleep) {
+    try {
+      handler.accept(extraSleep);
+    } catch (Throwable t) {
+      LOG.error("{}: Failed to handle extra sleep {}", this, extraSleep, t);
+    }
+  }
+
+  /** Start this monitor. */
+  public void start() {
+    final MemoizedSupplier<Thread> supplier = JavaUtils.memoize(() -> new Daemon(this::run));
+    Optional.of(threadRef.updateAndGet(previous -> Optional.ofNullable(previous).orElseGet(supplier)))
+        .filter(t -> supplier.isInitialized())
+        .ifPresent(Thread::start);
+  }
+
+  /** Stop this monitor. */
+  public void stop() {
+    final Thread previous = threadRef.getAndSet(null);
+    if (previous != null) {
+      previous.interrupt();
+      try {
+        previous.join();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return name;
+  }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 1c186ff..54d0dab 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -299,15 +299,22 @@ public final class TimeDuration implements Comparable<TimeDuration> {
     return duration <= 0;
   }
 
-  /** Performs a {@link TimeUnit#sleep(long)} using this {@link TimeDuration}. */
-  public void sleep() throws InterruptedException {
-    sleep(null);
+  /** The same as sleep(null). */
+  public TimeDuration sleep() throws InterruptedException {
+    return sleep(null);
   }
 
-  public void sleep(Consumer<Object> log) throws InterruptedException {
+  /**
+   * Performs a {@link TimeUnit#sleep(long)} using this {@link TimeDuration}.
+   *
+   * @param log If not null, use it to print log messages.
+   * @return the difference of the actual sleep time duration and this {@link TimeDuration}.
+   */
+  public TimeDuration sleep(Consumer<Object> log) throws InterruptedException {
     if (log != null) {
       log.accept(StringUtils.stringSupplierAsObject(() -> "Start sleeping " + this));
     }
+    final Timestamp start = Timestamp.currentTime();
     try {
       unit.sleep(duration);
       if (log != null) {
@@ -320,6 +327,7 @@ public final class TimeDuration implements Comparable<TimeDuration> {
       }
       throw ie;
     }
+    return start.elapsedTime().subtract(this);
   }
 
   @Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java b/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
index 8ce45ae..ba5fb8c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
@@ -67,6 +67,15 @@ public final class Timestamp implements Comparable<Timestamp> {
   }
 
   /**
+   * @param t the time period to be added.
+   * @return a new {@link Timestamp} whose value is calculated
+   *         by adding the given milliseconds to this timestamp.
+   */
+  public Timestamp addTime(TimeDuration t) {
+    return new Timestamp(nanos + t.to(TimeUnit.NANOSECONDS).getDuration());
+  }
+
+  /**
    * @return the elapsed time in milliseconds.
    *         If the timestamp is a future time, the returned value is negative.
    */
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java b/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java
deleted file mode 100644
index 9470110..0000000
--- a/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.server;
-
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
-import org.apache.ratis.thirdparty.com.google.common.base.Joiner;
-import org.apache.ratis.thirdparty.com.google.common.base.Stopwatch;
-import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
-import org.apache.ratis.thirdparty.com.google.common.collect.Maps;
-import org.apache.ratis.thirdparty.com.google.common.collect.Sets;
-import org.apache.ratis.util.Daemon;
-import org.apache.ratis.util.JavaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-
-public class JvmPauseMonitor {
-  public static final Logger LOG =
-    LoggerFactory.getLogger(JvmPauseMonitor.class);
-  /**
-   * The target sleep time
-   */
-  private static final long SLEEP_INTERVAL_MS = 500;
-
-  private Thread monitorThread;
-  private volatile boolean shouldRun = true;
-  private final RaftServerProxy proxy;
-
-  private String formatMessage(long extraSleepTime,
-                               Map<String, GcTimes> gcTimesAfterSleep,
-                               Map<String, GcTimes> gcTimesBeforeSleep) {
-
-    Set<String> gcBeanNames = Sets.intersection(gcTimesAfterSleep.keySet(),
-      gcTimesBeforeSleep.keySet());
-    List<String> gcDiffs = Lists.newArrayList();
-    for (String name : gcBeanNames) {
-      GcTimes diff = gcTimesAfterSleep.get(name)
-                       .subtract(gcTimesBeforeSleep.get(name));
-      if (diff.gcCount != 0) {
-        gcDiffs.add("GC pool '" + name + "' had collection(s): " + diff);
-      }
-    }
-
-    String ret = "Detected pause in JVM or host machine (eg GC): "
-                   + "pause of approximately " + extraSleepTime + "ms\n";
-    if (gcDiffs.isEmpty()) {
-      ret += "No GCs detected";
-    } else {
-      ret += Joiner.on("\n").join(gcDiffs);
-    }
-    return ret;
-  }
-
-  public JvmPauseMonitor(RaftServerProxy proxy) {
-    this.proxy = proxy;
-  }
-
-  private Map<String, GcTimes> getGcTimes() {
-    Map<String, GcTimes> map = Maps.newHashMap();
-    List<GarbageCollectorMXBean> gcBeans =
-      ManagementFactory.getGarbageCollectorMXBeans();
-    for (GarbageCollectorMXBean gcBean : gcBeans) {
-      map.put(gcBean.getName(), new GcTimes(gcBean));
-    }
-    return map;
-  }
-
-  private static final class GcTimes {
-    private GcTimes(GarbageCollectorMXBean gcBean) {
-      gcCount = gcBean.getCollectionCount();
-      gcTimeMillis = gcBean.getCollectionTime();
-    }
-
-    private GcTimes(long count, long time) {
-      this.gcCount = count;
-      this.gcTimeMillis = time;
-    }
-
-    private GcTimes subtract(GcTimes other) {
-      return new GcTimes(this.gcCount - other.gcCount,
-        this.gcTimeMillis - other.gcTimeMillis);
-    }
-
-    @Override
-    public String toString() {
-      return "count=" + gcCount + " time=" + gcTimeMillis + "ms";
-    }
-
-    private long gcCount;
-    private long gcTimeMillis;
-  }
-
-  class Monitor implements Runnable {
-    private final String name = JvmPauseMonitor.this + "-" + JavaUtils
-                                                               .getClassSimpleName(getClass());
-
-    public void run() {
-      int leaderStepDownWaitTime =
-        RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(proxy.getProperties()).toIntExact(TimeUnit
-                                                                                                       .MILLISECONDS);
-      int rpcSlownessTimeoutMs = RaftServerConfigKeys.Rpc.slownessTimeout(proxy.getProperties()).toIntExact(
-        TimeUnit.MILLISECONDS);
-      Stopwatch sw = Stopwatch.createUnstarted();
-      Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes();
-      LOG.info("Starting Ratis JVM pause monitor");
-      while (shouldRun) {
-        sw.reset().start();
-        try {
-          Thread.sleep(SLEEP_INTERVAL_MS);
-        } catch (InterruptedException ie) {
-          return;
-        }
-        long extraSleepTime = sw.elapsed(TimeUnit.MILLISECONDS) - SLEEP_INTERVAL_MS;
-        Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
-
-        if (extraSleepTime > 100) {
-          LOG.warn(formatMessage(extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
-        }
-        try {
-          if (extraSleepTime > rpcSlownessTimeoutMs) {
-            // close down all pipelines if the total gc period exceeds
-            // rpc slowness timeout
-            proxy.close();
-          } else if (extraSleepTime > leaderStepDownWaitTime) {
-            proxy.getImpls().forEach(RaftServerImpl::stepDownOnJvmPause);
-          }
-        } catch (IOException ioe) {
-          LOG.info("Encountered exception in JvmPauseMonitor {}", ioe);
-        }
-      }
-    }
-
-    @Override
-    public String toString() {
-      return name;
-    }
-  }
-
-
-  public void start() {
-    monitorThread = new Daemon(new Monitor());
-    monitorThread.start();
-  }
-
-  public void stop() {
-    shouldRun = false;
-    if (monitorThread != null) {
-      monitorThread.interrupt();
-      try {
-        monitorThread.join();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-    }
-  }
-}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index e652408..9543380 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -62,10 +62,10 @@ public interface RaftServerConfigKeys {
   }
 
   String SLEEP_DEVIATION_THRESHOLD_KEY = PREFIX + ".sleep.deviation.threshold";
-  int SLEEP_DEVIATION_THRESHOLD_DEFAULT = 300;
-  static int sleepDeviationThreshold(RaftProperties properties) {
-    return getInt(properties::getInt, SLEEP_DEVIATION_THRESHOLD_KEY,
-        SLEEP_DEVIATION_THRESHOLD_DEFAULT, getDefaultLog());
+  TimeDuration SLEEP_DEVIATION_THRESHOLD_DEFAULT = TimeDuration.valueOf(300, TimeUnit.MILLISECONDS);
+  static TimeDuration sleepDeviationThreshold(RaftProperties properties) {
+    return getTimeDuration(properties.getTimeDuration(SLEEP_DEVIATION_THRESHOLD_DEFAULT.getUnit()),
+        SLEEP_DEVIATION_THRESHOLD_KEY, SLEEP_DEVIATION_THRESHOLD_DEFAULT, getDefaultLog());
   }
   static void setSleepDeviationThreshold(RaftProperties properties, int thresholdMs) {
     setInt(properties::setInt, SLEEP_DEVIATION_THRESHOLD_KEY, thresholdMs);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index 7c08a0e..b3173bb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -109,11 +109,14 @@ class FollowerState extends Daemon {
 
   @Override
   public  void run() {
-    long sleepDeviationThresholdMs = server.getSleepDeviationThresholdMs();
+    final TimeDuration sleepDeviationThreshold = server.getSleepDeviationThreshold();
     while (isRunning && server.isFollower()) {
-      final long electionTimeout = server.getRandomTimeoutMs();
+      final TimeDuration electionTimeout = server.getRandomElectionTimeout();
       try {
-        if (!JavaUtils.sleep(electionTimeout, sleepDeviationThresholdMs)) {
+        final TimeDuration extraSleep = electionTimeout.sleep();
+        if (extraSleep.compareTo(sleepDeviationThreshold) > 0) {
+          LOG.warn("Unexpected long sleep: sleep {} but took extra {} (> threshold = {})",
+              electionTimeout, extraSleep, sleepDeviationThreshold);
           continue;
         }
 
@@ -123,10 +126,11 @@ class FollowerState extends Daemon {
           break;
         }
         synchronized (server) {
-          if (outstandingOp.get() == 0 && lastRpcTime.elapsedTimeMs() >= electionTimeout
+          if (outstandingOp.get() == 0
+              && lastRpcTime.elapsedTime().compareTo(electionTimeout) >= 0
               && !lostMajorityHeartbeatsRecently()) {
-            LOG.info("{}: change to CANDIDATE, lastRpcTime:{}ms, electionTimeout:{}ms",
-                this, lastRpcTime.elapsedTimeMs(), electionTimeout);
+            LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}, electionTimeout:{}",
+                this, lastRpcTime.elapsedTime(), electionTimeout);
             server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters.
             // election timeout, should become a candidate
             server.changeToCandidate();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index dbbefa8..1a31d66 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -285,7 +285,7 @@ class LeaderElection implements Runnable {
 
   private ResultAndTerm waitForResults(final long electionTerm, final int submitted,
       RaftConfiguration conf, Executor voteExecutor) throws InterruptedException {
-    final Timestamp timeout = Timestamp.currentTime().addTimeMs(server.getRandomTimeoutMs());
+    final Timestamp timeout = Timestamp.currentTime().addTime(server.getRandomElectionTimeout());
     final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
     final List<Exception> exceptions = new ArrayList<>();
     int waitForNum = submitted;
@@ -336,9 +336,7 @@ class LeaderElection implements Runnable {
 
         // remove higher priority peer, so that we check higherPriorityPeers empty to make sure
         // all higher priority peers have replied
-        if (higherPriorityPeers.contains(replierId)) {
-          higherPriorityPeers.remove(replierId);
-        }
+        higherPriorityPeers.remove(replierId);
 
         if (r.getServerReply().getSuccess()) {
           votedPeers.add(replierId);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index a1dcc36..1393b0e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -91,7 +91,7 @@ public class RaftServerImpl implements RaftServer.Division,
   private final int maxTimeoutMs;
   private final TimeDuration leaderStepDownWaitTime;
   private final int rpcSlownessTimeoutMs;
-  private final int sleepDeviationThresholdMs;
+  private final TimeDuration sleepDeviationThreshold;
   private final boolean installSnapshotEnabled;
 
   private final LifeCycle lifeCycle;
@@ -128,7 +128,7 @@ public class RaftServerImpl implements RaftServer.Division,
     maxTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMax(properties).toIntExact(TimeUnit.MILLISECONDS);
     rpcSlownessTimeoutMs = RaftServerConfigKeys.Rpc.slownessTimeout(properties).toIntExact(TimeUnit.MILLISECONDS);
     leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
-    sleepDeviationThresholdMs = RaftServerConfigKeys.sleepDeviationThreshold(properties);
+    this.sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties);
     installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
     Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs,
         "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
@@ -168,16 +168,17 @@ public class RaftServerImpl implements RaftServer.Division,
     return rpcSlownessTimeoutMs;
   }
 
-  int getRandomTimeoutMs() {
-    return minTimeoutMs + ThreadLocalRandom.current().nextInt(maxTimeoutMs - minTimeoutMs + 1);
+  TimeDuration getRandomElectionTimeout() {
+    final long millis = minTimeoutMs + ThreadLocalRandom.current().nextInt(maxTimeoutMs - minTimeoutMs + 1);
+    return TimeDuration.valueOf(millis, TimeUnit.MILLISECONDS);
   }
 
   TimeDuration getLeaderStepDownWaitTime() {
     return leaderStepDownWaitTime;
   }
 
-  int getSleepDeviationThresholdMs() {
-    return sleepDeviationThresholdMs;
+  TimeDuration getSleepDeviationThreshold() {
+    return sleepDeviationThreshold;
   }
 
   @Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index f1dbe2f..b18b07b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -35,7 +35,7 @@ import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.JvmPauseMonitor;
+import org.apache.ratis.util.JvmPauseMonitor;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.statemachine.StateMachine;
@@ -44,6 +44,7 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.function.CheckedFunction;
 
 import java.io.Closeable;
@@ -168,6 +169,7 @@ public class RaftServerProxy implements RaftServer {
 
   private final ImplMap impls = new ImplMap();
   private final ExecutorService implExecutor = Executors.newSingleThreadExecutor();
+
   private final JvmPauseMonitor pauseMonitor;
 
   RaftServerProxy(RaftPeerId id, StateMachine.Registry stateMachineRegistry,
@@ -184,7 +186,20 @@ public class RaftServerProxy implements RaftServer {
     this.lifeCycle = new LifeCycle(this.id + "-" + JavaUtils.getClassSimpleName(getClass()));
 
     this.dataStreamServerRpc = new DataStreamServerImpl(this, parameters).getServerRpc();
-    this.pauseMonitor = new JvmPauseMonitor(this);
+
+    final TimeDuration rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties);
+    final TimeDuration leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
+    this.pauseMonitor = new JvmPauseMonitor(id,
+        extraSleep -> handleJvmPause(extraSleep, rpcSlownessTimeout, leaderStepDownWaitTime));
+  }
+
+  private void handleJvmPause(TimeDuration extraSleep, TimeDuration closeThreshold, TimeDuration stepDownThreshold)
+      throws IOException {
+    if (extraSleep.compareTo(closeThreshold) > 0) {
+      close();
+    } else if (extraSleep.compareTo(stepDownThreshold) > 0) {
+      getImpls().forEach(RaftServerImpl::stepDownOnJvmPause);
+    }
   }
 
   /** Check the storage dir and add groups*/