You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2019/03/28 10:25:02 UTC

[incubator-ratis] branch branch-0.3 updated: RATIS-503. RaftLog is accessed by LeaderElection after closed.

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new d6985c4  RATIS-503. RaftLog is accessed by LeaderElection after closed.
d6985c4 is described below

commit d6985c4d6f290f5bbe1e2a100875dca785ebe35f
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Mar 28 18:24:02 2019 +0800

    RATIS-503. RaftLog is accessed by LeaderElection after closed.
---
 .../main/java/org/apache/ratis/util/Daemon.java    |   9 +-
 .../main/java/org/apache/ratis/util/LifeCycle.java |  49 ++++--
 .../main/java/org/apache/ratis/util/LogUtils.java  |  12 ++
 .../java/org/apache/ratis/util/PeerProxyMap.java   |   2 +-
 .../java/org/apache/ratis/util/TimeDuration.java   |   5 +
 .../apache/ratis/server/impl/LeaderElection.java   | 173 +++++++++++++--------
 .../apache/ratis/server/impl/RaftServerImpl.java   |   4 +-
 .../org/apache/ratis/server/impl/RoleInfo.java     |   2 +-
 .../org/apache/ratis/server/impl/ServerState.java  |  33 ++--
 .../apache/ratis/server/protocol/TermIndex.java    |   7 +-
 .../server/simulation/SimulatedServerRpc.java      |  23 ++-
 11 files changed, 219 insertions(+), 100 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
index 1ef95ae..b311854 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -29,7 +29,12 @@ public class Daemon extends Thread {
 
   /** Construct a daemon thread with the given runnable. */
   public Daemon(Runnable runnable) {
+    this(runnable, runnable.toString());
+  }
+
+  /** Construct a daemon thread with the given runnable. */
+  public Daemon(Runnable runnable, String name) {
     super(runnable);
-    this.setName(runnable.toString());
+    this.setName(name);
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
index dd91c28..200a99e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.util;
 
 import org.apache.ratis.util.function.CheckedRunnable;
+import org.apache.ratis.util.function.CheckedSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,6 +80,16 @@ public class LifeCycle {
       return false;
     }
 
+    /** Is this {@link State#RUNNING}? */
+    public boolean isRunning() {
+      return this == RUNNING;
+    }
+
+    /** Is this {@link State#CLOSING} or {@link State#CLOSED}? */
+    public boolean isClosingOrClosed() {
+      return this == State.CLOSING || this == State.CLOSED;
+    }
+
     static void put(State key, Map<State, List<State>> map, State... values) {
       map.put(key, Collections.unmodifiableList(Arrays.asList(values)));
     }
@@ -197,31 +208,47 @@ public class LifeCycle {
     }
   }
 
+  /**
+   * Check the current state and, if applicable, transit to {@link State#CLOSING}.
+   * If this is already in {@link State#CLOSING} or {@link State#CLOSED},
+   * then invoking this method has no effect.
+   * In other words, this method can be safely called multiple times.
+   */
+  public State checkStateAndClose() {
+    return checkStateAndClose(() -> State.CLOSING);
+  }
 
   /**
    * Check the current state and, if applicable, run the given close method.
-   * This method can be called multiple times
+   * If this is already in {@link State#CLOSING} or {@link State#CLOSED},
+   * then invoking this method has no effect.
+   * In other words, this method can be safely called multiple times
    * while the given close method will only be executed at most once.
    */
-  public <T extends Throwable> void checkStateAndClose(
-      CheckedRunnable<T> closeImpl) throws T {
+  public <T extends Throwable> State checkStateAndClose(CheckedRunnable<T> closeMethod) throws T {
+    return checkStateAndClose(() -> {
+      try {
+        closeMethod.run();
+      } finally {
+        transition(State.CLOSED);
+      }
+      return State.CLOSED;
+    });
+  }
+
+  private <T extends Throwable> State checkStateAndClose(CheckedSupplier<State, T> closeMethod) throws T {
     if (compareAndTransition(State.NEW, State.CLOSED)) {
-      return;
+      return State.CLOSED;
     }
 
     for(;;) {
       final State c = getCurrentState();
       if (c.isOneOf(State.CLOSING, State.CLOSED)) {
-        return; //already closing or closed.
+        return c; //already closing or closed.
       }
 
       if (compareAndTransition(c, State.CLOSING)) {
-        try {
-          closeImpl.run();
-        } finally {
-          transition(State.CLOSED);
-        }
-        return;
+        return closeMethod.get();
       }
 
       // lifecycle state is changed, retry.
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
index d30bae1..92a03cc 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
@@ -137,4 +137,16 @@ public interface LogUtils {
       }
     }
   }
+
+  static void infoOrTrace(Logger log, String message, Throwable t) {
+    infoOrTrace(log, () -> message, t);
+  }
+
+  static void infoOrTrace(Logger log, Supplier<String> message, Throwable t) {
+    if (log.isTraceEnabled()) {
+      log.trace(message.get(), t);
+    } else {
+      log.info("{}: {}", message.get(), t);
+    }
+  }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index b2720ff..014dc30 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -54,7 +54,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements Closeable {
         synchronized (this) {
           if (proxy == null) {
             final LifeCycle.State current = lifeCycle.getCurrentState();
-            if (current.isOneOf(LifeCycle.State.CLOSING, LifeCycle.State.CLOSED)) {
+            if (current.isClosingOrClosed()) {
               throw new AlreadyClosedException(name + " is already " + current);
             }
             lifeCycle.startAndTransition(
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index a5ebd55..b5205f3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -196,6 +196,11 @@ public final class TimeDuration implements Comparable<TimeDuration> {
     return duration < 0;
   }
 
+  /** @return Is this {@link TimeDuration} less than or equal to zero? */
+  public boolean isNonPositive() {
+    return duration <= 0;
+  }
+
   /** Performs a {@link TimeUnit#sleep(long)} using this {@link TimeDuration}. */
   public void sleep() throws InterruptedException {
     unit.sleep(duration);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index fd02632..796f5e2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -17,15 +17,18 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
-import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.Timestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,23 +37,28 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-class LeaderElection extends Daemon {
+class LeaderElection implements Runnable {
   public static final Logger LOG = LoggerFactory.getLogger(LeaderElection.class);
 
   private ResultAndTerm logAndReturn(Result result,
       List<RequestVoteReplyProto> responses,
       List<Exception> exceptions, long newTerm) {
-    LOG.info(server.getId() + ": Election " + result + "; received "
-        + responses.size() + " response(s) "
+    LOG.info(this + ": Election " + result + "; received " + responses.size() + " response(s) "
         + responses.stream().map(ProtoUtils::toString).collect(Collectors.toList())
         + " and " + exceptions.size() + " exception(s); " + server.getState());
     int i = 0;
     for(Exception e : exceptions) {
-      LOG.info("  " + i++ + ": " + e);
-      LOG.trace("TRACE", e);
+      final int j = i++;
+      LogUtils.infoOrTrace(LOG, () -> "  Exception " + j, e);
     }
     return new ResultAndTerm(result, newTerm);
   }
@@ -67,33 +75,52 @@ class LeaderElection extends Daemon {
     }
   }
 
+  static class Executor {
+    private final ExecutorCompletionService<RequestVoteReplyProto> service;
+    private final ExecutorService executor;
+
+    private final AtomicInteger count = new AtomicInteger();
+
+    Executor(Object name, int size) {
+      Preconditions.assertTrue(size > 0);
+      executor = Executors.newFixedThreadPool(size, r -> new Daemon(r, name + "-" + count.incrementAndGet()));
+      service = new ExecutorCompletionService<>(executor);
+    }
+
+    void shutdown() {
+      executor.shutdown();
+    }
+
+    void submit(Callable<RequestVoteReplyProto> task) {
+      service.submit(task);
+    }
+
+    Future<RequestVoteReplyProto> poll(TimeDuration waitTime) throws InterruptedException {
+      return service.poll(waitTime.getDuration(), waitTime.getUnit());
+    }
+  }
+
+  private static final AtomicInteger COUNT = new AtomicInteger();
+
+  private final String name;
+  private final LifeCycle lifeCycle;
+  private final Daemon daemon;
+
   private final RaftServerImpl server;
-  private ExecutorCompletionService<RequestVoteReplyProto> service;
-  private ExecutorService executor;
-  private volatile boolean running;
-  /**
-   * The Raft configuration should not change while the peer is in candidate
-   * state. If the configuration changes, another peer should be acting as a
-   * leader and this LeaderElection session should end.
-   */
-  private final RaftConfiguration conf;
-  private final Collection<RaftPeer> others;
 
   LeaderElection(RaftServerImpl server) {
+    this.name = server.getId() + ":" + server.getGroupId() + ":" + getClass().getSimpleName() + COUNT.incrementAndGet();
+    this.lifeCycle = new LifeCycle(this);
+    this.daemon = new Daemon(this);
     this.server = server;
-    conf = server.getRaftConf();
-    others = conf.getOtherPeers(server.getId());
-    this.running = true;
   }
 
-  void stopRunning() {
-    this.running = false;
+  void start() {
+    lifeCycle.startAndTransition(daemon::start);
   }
 
-  private void initExecutor() {
-    Preconditions.assertTrue(!others.isEmpty());
-    executor = Executors.newFixedThreadPool(others.size(), Daemon::new);
-    service = new ExecutorCompletionService<>(executor);
+  void shutdown() {
+    lifeCycle.checkStateAndClose();
   }
 
   @Override
@@ -103,29 +130,54 @@ class LeaderElection extends Daemon {
     } catch (InterruptedException e) {
       // the leader election thread is interrupted. The peer may already step
       // down to a follower. The leader election should skip.
-      LOG.info(server.getId() + " " + getClass().getSimpleName()
-          + " thread is interrupted gracefully; server=" + server);
-    } catch (IOException e) {
-      LOG.warn("Failed to persist votedFor/term. Exit the leader election.", e);
-      stopRunning();
+      LOG.info("{} thread is interrupted gracefully; server={}", this, server);
+    } catch(Throwable e) {
+      final LifeCycle.State state = lifeCycle.getCurrentState();
+      final String message = "Failed " + this + ", state=" + state;
+
+      if (state.isClosingOrClosed()) {
+        LogUtils.infoOrTrace(LOG, message, e);
+        LOG.info("{}: {} is safely ignored since this is already {}",
+            this, e.getClass().getSimpleName(), state);
+      } else {
+        if (!server.isAlive()) {
+          LogUtils.infoOrTrace(LOG, message, e);
+          LOG.info("{}: {} is safely ignored since the server is not alive: {}",
+              this, e.getClass().getSimpleName(), server);
+        } else {
+          LOG.error(message, e);
+        }
+        shutdown();
+      }
+    } finally {
+      lifeCycle.transition(LifeCycle.State.CLOSED);
     }
   }
 
+  private boolean shouldRun() {
+    return lifeCycle.getCurrentState().isRunning() && server.isCandidate() && server.isAlive();
+  }
+
+  private boolean shouldRun(long electionTerm) {
+    return shouldRun() && server.getState().getCurrentTerm() == electionTerm;
+  }
+
   /**
    * After a peer changes its role to candidate, it invokes this method to
    * send out requestVote rpc to all other peers.
    */
   private void askForVotes() throws InterruptedException, IOException {
     final ServerState state = server.getState();
-    while (running && server.isCandidate()) {
+    while (shouldRun()) {
       // one round of requestVotes
       final long electionTerm;
+      final RaftConfiguration conf;
       synchronized (server) {
         electionTerm = state.initElection();
-        server.getState().persistMetadata();
+        conf = state.getRaftConf();
+        state.persistMetadata();
       }
-      LOG.info(state.getSelfId() + ": begin an election in Term "
-          + electionTerm);
+      LOG.info("{}: begin an election at term {} for {}", this, electionTerm, conf);
 
       TermIndex lastEntry = state.getLog().getLastEntryTermIndex();
       if (lastEntry == null) {
@@ -137,24 +189,22 @@ class LeaderElection extends Daemon {
       }
 
       final ResultAndTerm r;
+      final Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
       if (others.isEmpty()) {
         r = new ResultAndTerm(Result.PASSED, electionTerm);
       } else {
+        final Executor voteExecutor = new Executor(this, others.size());
         try {
-          initExecutor();
-          int submitted = submitRequests(electionTerm, lastEntry);
-          r = waitForResults(electionTerm, submitted);
+          final int submitted = submitRequests(electionTerm, lastEntry, others, voteExecutor);
+          r = waitForResults(electionTerm, submitted, conf, voteExecutor);
         } finally {
-          if (executor != null) {
-            executor.shutdown();
-          }
+          voteExecutor.shutdown();
         }
       }
 
       synchronized (server) {
-        if (electionTerm != state.getCurrentTerm() || !running ||
-            !server.isCandidate()) {
-          return; // term already passed or no longer a candidate.
+        if (!shouldRun(electionTerm)) {
+          return; // term already passed or this should not run anymore.
         }
 
         switch (r.result) {
@@ -162,14 +212,12 @@ class LeaderElection extends Daemon {
             server.changeToLeader();
             return;
           case SHUTDOWN:
-            LOG.info("{} received shutdown response when requesting votes.",
-                server.getId());
+            LOG.info("{} received shutdown response when requesting votes.", this);
             server.getProxy().close();
             return;
           case REJECTED:
           case DISCOVERED_A_NEW_TERM:
-            final long term = r.term > server.getState().getCurrentTerm() ?
-                r.term : server.getState().getCurrentTerm();
+            final long term = Math.max(r.term, state.getCurrentTerm());
             server.changeToFollowerAndPersistMetadata(term, Result.DISCOVERED_A_NEW_TERM);
             return;
           case TIMEOUT:
@@ -179,34 +227,33 @@ class LeaderElection extends Daemon {
     }
   }
 
-  private int submitRequests(final long electionTerm, final TermIndex lastEntry) {
+  private int submitRequests(final long electionTerm, final TermIndex lastEntry,
+      Collection<RaftPeer> others, Executor voteExecutor) {
     int submitted = 0;
     for (final RaftPeer peer : others) {
       final RequestVoteRequestProto r = server.createRequestVoteRequest(
           peer.getId(), electionTerm, lastEntry);
-      service.submit(
-          () -> server.getServerRpc().requestVote(r));
+      voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
       submitted++;
     }
     return submitted;
   }
 
-  private ResultAndTerm waitForResults(final long electionTerm,
-      final int submitted) throws InterruptedException {
+  private ResultAndTerm waitForResults(final long electionTerm, final int submitted,
+      RaftConfiguration conf, Executor voteExecutor) throws InterruptedException {
     final Timestamp timeout = Timestamp.currentTime().addTimeMs(server.getRandomTimeoutMs());
     final List<RequestVoteReplyProto> responses = new ArrayList<>();
     final List<Exception> exceptions = new ArrayList<>();
     int waitForNum = submitted;
     Collection<RaftPeerId> votedPeers = new ArrayList<>();
-    while (waitForNum > 0 && running && server.isCandidate()) {
-      final long waitTime = -timeout.elapsedTimeMs();
-      if (waitTime <= 0) {
+    while (waitForNum > 0 && shouldRun(electionTerm)) {
+      final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n);
+      if (waitTime.isNonPositive()) {
         return logAndReturn(Result.TIMEOUT, responses, exceptions, -1);
       }
 
       try {
-        final Future<RequestVoteReplyProto> future = service.poll(
-            waitTime, TimeUnit.MILLISECONDS);
+        final Future<RequestVoteReplyProto> future = voteExecutor.poll(waitTime);
         if (future == null) {
           continue; // poll timeout, continue to return Result.TIMEOUT
         }
@@ -227,8 +274,7 @@ class LeaderElection extends Daemon {
           }
         }
       } catch(ExecutionException e) {
-        LOG.info("{} got exception when requesting votes: {}", server.getId(), e);
-        LOG.trace("TRACE", e);
+        LogUtils.infoOrTrace(LOG, () -> this + " got exception when requesting votes", e);
         exceptions.add(e);
       }
       waitForNum--;
@@ -236,4 +282,9 @@ class LeaderElection extends Daemon {
     // received all the responses
     return logAndReturn(Result.REJECTED, responses, exceptions, -1);
   }
+
+  @Override
+  public String toString() {
+    return name;
+  }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index fe26d45..fb57b6c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -52,8 +52,6 @@ import java.util.stream.Collectors;
 import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY;
 import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER;
 import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
-import static org.apache.ratis.util.LifeCycle.State.CLOSED;
-import static org.apache.ratis.util.LifeCycle.State.CLOSING;
 import static org.apache.ratis.util.LifeCycle.State.NEW;
 import static org.apache.ratis.util.LifeCycle.State.RUNNING;
 import static org.apache.ratis.util.LifeCycle.State.STARTING;
@@ -269,7 +267,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
   }
 
   public boolean isAlive() {
-    return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED);
+    return !lifeCycle.getCurrentState().isClosingOrClosed();
   }
 
   public boolean isFollower() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index 52f4b81..a27a086 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -132,7 +132,7 @@ class RoleInfo {
     final LeaderElection election = leaderElection.getAndSet(null);
     if (election != null) {
       LOG.info("{}: shutdown {}", id, election.getClass().getSimpleName());
-      election.stopRunning();
+      election.shutdown();
       // no need to interrupt the election thread
     }
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index ee4ede9..0a343b2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -41,6 +41,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
 import static org.apache.ratis.server.impl.RaftServerImpl.LOG;
@@ -65,10 +66,12 @@ public class ServerState implements Closeable {
   private final long leaderElectionTimeoutMs;
 
   /**
-   * Latest term server has seen. initialized to 0 on first boot, increases
-   * monotonically.
+   * Latest term server has seen.
+   * Initialized to 0 on first boot, increases monotonically.
+   *
+   * @see TermIndex#isValidTerm(int)
    */
-  private volatile long currentTerm;
+  private final AtomicLong currentTerm = new AtomicLong();
   /**
    * The server ID of the leader for this term. Null means either there is
    * no leader for this term yet or this server does not know who it is yet.
@@ -116,7 +119,7 @@ public class ServerState implements Closeable {
     log = initLog(id, prop, lastApplied, this::setRaftConf);
 
     RaftLog.Metadata metadata = log.loadMetadata();
-    currentTerm = metadata.getTerm();
+    currentTerm.set(metadata.getTerm());
     votedFor = metadata.getVotedFor();
 
     stateMachineUpdater = new StateMachineUpdater(stateMachine, server, log,
@@ -203,12 +206,12 @@ public class ServerState implements Closeable {
   }
 
   public long getCurrentTerm() {
-    return currentTerm;
+    return currentTerm.get();
   }
 
   boolean updateCurrentTerm(long newTerm) {
-    if (newTerm > currentTerm) {
-      currentTerm = newTerm;
+    final long current = currentTerm.getAndUpdate(curTerm -> Math.max(curTerm, newTerm));
+    if (newTerm > current) {
       votedFor = null;
       setLeader(null, "updateCurrentTerm");
       return true;
@@ -230,11 +233,11 @@ public class ServerState implements Closeable {
   long initElection() {
     votedFor = selfId;
     setLeader(null, "initElection");
-    return ++currentTerm;
+    return currentTerm.incrementAndGet();
   }
 
   void persistMetadata() throws IOException {
-    this.log.writeMetadata(currentTerm, votedFor);
+    this.log.writeMetadata(currentTerm.get(), votedFor);
   }
 
   /**
@@ -281,7 +284,7 @@ public class ServerState implements Closeable {
   }
 
   void appendLog(TransactionContext operation) throws StateMachineException {
-    log.append(currentTerm, operation);
+    log.append(currentTerm.get(), operation);
     Objects.requireNonNull(operation.getLogEntry());
   }
 
@@ -291,9 +294,10 @@ public class ServerState implements Closeable {
    * @return true if the check passes
    */
   boolean recognizeLeader(RaftPeerId leaderId, long leaderTerm) {
-    if (leaderTerm < currentTerm) {
+    final long current = currentTerm.get();
+    if (leaderTerm < current) {
       return false;
-    } else if (leaderTerm > currentTerm || this.leaderId == null) {
+    } else if (leaderTerm > current || this.leaderId == null) {
       // If the request indicates a term that is greater than the current term
       // or no leader has been set for the current term, make sure to update
       // leader and term later
@@ -309,9 +313,10 @@ public class ServerState implements Closeable {
     if (!getRaftConf().containsInConf(candidateId)) {
       return false;
     }
-    if (candidateTerm > currentTerm) {
+    final long current = currentTerm.get();
+    if (candidateTerm > current) {
       return true;
-    } else if (candidateTerm == currentTerm) {
+    } else if (candidateTerm == current) {
       // has not voted yet or this is a retry
       return votedFor == null || votedFor.equals(candidateId);
     }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java b/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
index fed14a3..ab9a030 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -31,6 +31,11 @@ public interface TermIndex extends Comparable<TermIndex> {
   /** @return the index. */
   long getIndex();
 
+  /** A term number is valid iff it is greater than zero. */
+  static boolean isValidTerm(int term) {
+    return term > 0;
+  }
+
   /** Create a new {@link TermIndex} instance. */
   static TermIndex newTermIndex(long term, long index) {
     return ServerImplUtils.newTermIndex(term, index);
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index e9df14e..0a4243e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,15 +17,26 @@
  */
 package org.apache.ratis.server.simulation;
 
-import org.apache.ratis.protocol.*;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.protocol.GroupInfoRequest;
+import org.apache.ratis.protocol.GroupListRequest;
+import org.apache.ratis.protocol.GroupManagementRequest;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.impl.RaftServerProxy;
-import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.util.Daemon;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.LifeCycle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -126,7 +137,7 @@ class SimulatedServerRpc implements RaftServerRpc {
       = new RequestHandler.HandlerInterface<RaftServerRequest, RaftServerReply>() {
     @Override
     public boolean isAlive() {
-      return !server.getLifeCycleState().isOneOf(LifeCycle.State.CLOSING, LifeCycle.State.CLOSED);
+      return !server.getLifeCycleState().isClosingOrClosed();
     }
 
     @Override
@@ -148,7 +159,7 @@ class SimulatedServerRpc implements RaftServerRpc {
       = new RequestHandler.HandlerInterface<RaftClientRequest, RaftClientReply>() {
     @Override
     public boolean isAlive() {
-      return !server.getLifeCycleState().isOneOf(LifeCycle.State.CLOSING, LifeCycle.State.CLOSED);
+      return !server.getLifeCycleState().isClosingOrClosed();
     }
 
     @Override