You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/17 09:39:39 UTC

[incubator-ratis] branch master updated: RATIS-1149. Leader should step down and initiate a stateMachine action in case of jvm pauses. (#275)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 190b0f7  RATIS-1149. Leader should step down and initiate a stateMachine action in case of jvm pauses. (#275)
190b0f7 is described below

commit 190b0f7107b4ff1d801535e798d988412616a773
Author: bshashikant <sh...@apache.org>
AuthorDate: Tue Nov 17 15:09:27 2020 +0530

    RATIS-1149. Leader should step down and initiate a stateMachine action in case of jvm pauses. (#275)
    
    * RATIS-1149. Leader should step down and initiate a stateMachine action in case of jvm pauses.
    
    * Addressed review comments.
    
    * Addressed While loop.
    
    * Addressed checkstyle issues.
---
 .../org/apache/ratis/server/JvmPauseMonitor.java   | 180 +++++++++++++++++++++
 .../org/apache/ratis/server/impl/LeaderState.java  |   2 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |   6 +
 .../apache/ratis/server/impl/RaftServerProxy.java  |   7 +-
 4 files changed, 193 insertions(+), 2 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java b/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java
new file mode 100644
index 0000000..accd1fc
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server;
+
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.thirdparty.com.google.common.base.Joiner;
+import org.apache.ratis.thirdparty.com.google.common.base.Stopwatch;
+import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
+import org.apache.ratis.thirdparty.com.google.common.collect.Maps;
+import org.apache.ratis.thirdparty.com.google.common.collect.Sets;
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.JavaUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+
+public class JvmPauseMonitor {
+  public static final Logger LOG =
+    LoggerFactory.getLogger(JvmPauseMonitor.class);
+  /**
+   * The target sleep time
+   */
+  private static final long SLEEP_INTERVAL_MS = 500;
+
+  private Thread monitorThread;
+  private volatile boolean shouldRun = true;
+  private final RaftServerProxy proxy;
+
+  private String formatMessage(long extraSleepTime,
+                               Map<String, GcTimes> gcTimesAfterSleep,
+                               Map<String, GcTimes> gcTimesBeforeSleep) {
+
+    Set<String> gcBeanNames = Sets.intersection(gcTimesAfterSleep.keySet(),
+      gcTimesBeforeSleep.keySet());
+    List<String> gcDiffs = Lists.newArrayList();
+    for (String name : gcBeanNames) {
+      GcTimes diff = gcTimesAfterSleep.get(name)
+                       .subtract(gcTimesBeforeSleep.get(name));
+      if (diff.gcCount != 0) {
+        gcDiffs.add("GC pool '" + name + "' had collection(s): " + diff);
+      }
+    }
+
+    String ret = "Detected pause in JVM or host machine (eg GC): "
+                   + "pause of approximately " + extraSleepTime + "ms\n";
+    if (gcDiffs.isEmpty()) {
+      ret += "No GCs detected";
+    } else {
+      ret += Joiner.on("\n").join(gcDiffs);
+    }
+    return ret;
+  }
+
+  public JvmPauseMonitor(RaftServerProxy proxy) {
+    this.proxy = proxy;
+  }
+
+  private Map<String, GcTimes> getGcTimes() {
+    Map<String, GcTimes> map = Maps.newHashMap();
+    List<GarbageCollectorMXBean> gcBeans =
+      ManagementFactory.getGarbageCollectorMXBeans();
+    for (GarbageCollectorMXBean gcBean : gcBeans) {
+      map.put(gcBean.getName(), new GcTimes(gcBean));
+    }
+    return map;
+  }
+
+  private static final class GcTimes {
+    private GcTimes(GarbageCollectorMXBean gcBean) {
+      gcCount = gcBean.getCollectionCount();
+      gcTimeMillis = gcBean.getCollectionTime();
+    }
+
+    private GcTimes(long count, long time) {
+      this.gcCount = count;
+      this.gcTimeMillis = time;
+    }
+
+    private GcTimes subtract(GcTimes other) {
+      return new GcTimes(this.gcCount - other.gcCount,
+        this.gcTimeMillis - other.gcTimeMillis);
+    }
+
+    @Override
+    public String toString() {
+      return "count=" + gcCount + " time=" + gcTimeMillis + "ms";
+    }
+
+    private long gcCount;
+    private long gcTimeMillis;
+  }
+
+  class Monitor implements Runnable {
+    private final String name = JvmPauseMonitor.this + "-" + JavaUtils
+                                                               .getClassSimpleName(getClass());
+
+    public void run() {
+      int leaderStepDownWaitTime =
+        RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(proxy.getProperties()).toIntExact(TimeUnit
+                                                                                                       .MILLISECONDS);
+      int rpcSlownessTimeoutMs = RaftServerConfigKeys.Rpc.slownessTimeout(proxy.getProperties()).toIntExact(
+        TimeUnit.MILLISECONDS);
+      Stopwatch sw = Stopwatch.createUnstarted();
+      Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes();
+      LOG.info("Starting Ratis JVM pause monitor");
+      while (shouldRun) {
+        sw.reset().start();
+        try {
+          Thread.sleep(SLEEP_INTERVAL_MS);
+        } catch (InterruptedException ie) {
+          return;
+        }
+        long extraSleepTime = sw.elapsed(TimeUnit.MILLISECONDS) - SLEEP_INTERVAL_MS;
+        Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
+
+        if (extraSleepTime > 0) {
+          LOG.warn(formatMessage(extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
+        }
+        try {
+          if (extraSleepTime > rpcSlownessTimeoutMs) {
+            // close down all pipelines if the total gc period exceeds
+            // rpc slowness timeout
+            proxy.close();
+          } else if (extraSleepTime > leaderStepDownWaitTime) {
+            proxy.getImpls().forEach(RaftServerImpl::stepDownOnJvmPause);
+          }
+        } catch (IOException ioe) {
+          LOG.info("Encountered exception in JvmPauseMonitor {}", ioe);
+        }
+      }
+    }
+
+    @Override
+    public String toString() {
+      return name;
+    }
+  }
+
+
+  public void start() {
+    monitorThread = new Daemon(new Monitor());
+    monitorThread.start();
+  }
+
+  public void stop() {
+    shouldRun = false;
+    if (monitorThread != null) {
+      monitorThread.interrupt();
+      try {
+        monitorThread.join();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 1e7f348..fa591cc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -69,7 +69,7 @@ public class LeaderState {
   }
 
   enum StepDownReason {
-    HIGHER_TERM, HIGHER_PRIORITY, LOST_MAJORITY_HEARTBEATS, STATE_MACHINE_EXCEPTION;
+    HIGHER_TERM, HIGHER_PRIORITY, LOST_MAJORITY_HEARTBEATS, STATE_MACHINE_EXCEPTION, JVM_PAUSE;
 
     private final String longName = JavaUtils.getClassSimpleName(getClass()) + ":" + name();
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f6bbb84..2cfb4d3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -633,6 +633,12 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     return pending.getFuture();
   }
 
+  public void stepDownOnJvmPause() {
+    if (isLeader()) {
+      role.getLeaderState().ifPresent(leader -> leader.submitStepDownEvent(LeaderState.StepDownReason.JVM_PAUSE));
+    }
+  }
+
   @Override
   public CompletableFuture<RaftClientReply> submitClientRequestAsync(
       RaftClientRequest request) throws IOException {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index f29cbad..9364811 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -35,6 +35,7 @@ import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.JvmPauseMonitor;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.statemachine.StateMachine;
@@ -169,6 +170,7 @@ public class RaftServerProxy implements RaftServer {
 
   private final ImplMap impls = new ImplMap();
   private final ExecutorService implExecutor = Executors.newSingleThreadExecutor();
+  private final JvmPauseMonitor pauseMonitor;
 
   RaftServerProxy(RaftPeerId id, StateMachine.Registry stateMachineRegistry,
       RaftProperties properties, Parameters parameters) {
@@ -184,6 +186,7 @@ public class RaftServerProxy implements RaftServer {
     this.lifeCycle = new LifeCycle(this.id + "-" + JavaUtils.getClassSimpleName(getClass()));
 
     this.dataStreamServerRpc = new DataStreamServerImpl(this, parameters).getServerRpc();
+    this.pauseMonitor = new JvmPauseMonitor(this);
   }
 
   /** Check the storage dir and add groups*/
@@ -292,7 +295,7 @@ public class RaftServerProxy implements RaftServer {
     return IOUtils.getFromFuture(getImplFuture(groupId), this::getId);
   }
 
-  List<RaftServerImpl> getImpls() throws IOException {
+  public List<RaftServerImpl> getImpls() throws IOException {
     final List<RaftServerImpl> list = new ArrayList<>();
     for(CompletableFuture<RaftServerImpl> f : impls.getAll()) {
       list.add(IOUtils.getFromFuture(f, this::getId));
@@ -319,6 +322,7 @@ public class RaftServerProxy implements RaftServer {
       getServerRpc().start();
       getDataStreamServerRpc().start();
     }, IOException.class);
+    pauseMonitor.start();
   }
 
   @Override
@@ -346,6 +350,7 @@ public class RaftServerProxy implements RaftServer {
         LOG.warn(getId() + ": Failed to close " + SupportedDataStreamType.NETTY + " server", ignored);
       }
     });
+    pauseMonitor.stop();
   }
 
   private <REPLY> CompletableFuture<REPLY> submitRequest(RaftGroupId groupId,