You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/12/03 01:26:43 UTC

incubator-ratis git commit: RATIS-443. FollowerState.inLogSync can be incorrectly cleared in appendEntriesAsync.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 609773e03 -> 9296a3bc0


RATIS-443. FollowerState.inLogSync can be incorrectly cleared in appendEntriesAsync.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/9296a3bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/9296a3bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/9296a3bc

Branch: refs/heads/master
Commit: 9296a3bc0b79cf73a6576e0b4c0b981d1ea3b221
Parents: 609773e
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Sun Dec 2 16:57:20 2018 -0800
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Sun Dec 2 16:57:20 2018 -0800

----------------------------------------------------------------------
 ratis-proto/src/main/proto/Raft.proto           |   2 +-
 .../apache/ratis/server/impl/FollowerState.java |  42 ++++--
 .../ratis/server/impl/LeaderElection.java       |   2 +-
 .../apache/ratis/server/impl/LeaderState.java   |   2 +-
 .../ratis/server/impl/RaftServerImpl.java       | 135 +++++++++++--------
 5 files changed, 114 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9296a3bc/ratis-proto/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 83d4394..103c478 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -316,7 +316,7 @@ message LeaderInfoProto {
 
 message FollowerInfoProto {
   ServerRpcProto leaderInfo = 1;
-  bool inLogSync = 2;
+  uint32 outstandingOp = 2;
 }
 
 message CandidateInfoProto {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9296a3bc/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index f526091..903ab5e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,35 +22,59 @@ import org.apache.ratis.util.Timestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.ToIntFunction;
+
 /**
  * Used when the peer is a follower. Used to track the election timeout.
  */
 class FollowerState extends Daemon {
+  enum UpdateType {
+    APPEND_START(AtomicInteger::incrementAndGet),
+    APPEND_COMPLETE(AtomicInteger::decrementAndGet),
+    INSTALL_SNAPSHOT_START(AtomicInteger::incrementAndGet),
+    INSTALL_SNAPSHOT_COMPLETE(AtomicInteger::decrementAndGet),
+    REQUEST_VOTE(AtomicInteger::get);
+
+    private final ToIntFunction<AtomicInteger> updateFunction;
+
+    UpdateType(ToIntFunction<AtomicInteger> updateFunction) {
+      this.updateFunction = updateFunction;
+    }
+
+    int update(AtomicInteger outstanding) {
+      return updateFunction.applyAsInt(outstanding);
+    }
+  }
+
   static final Logger LOG = LoggerFactory.getLogger(FollowerState.class);
 
   private final RaftServerImpl server;
 
   private volatile Timestamp lastRpcTime = new Timestamp();
   private volatile boolean monitorRunning = true;
-  private volatile boolean inLogSync = false;
+  private final AtomicInteger outstandingOp = new AtomicInteger();
 
   FollowerState(RaftServerImpl server) {
     this.server = server;
   }
 
-  void updateLastRpcTime(boolean inLogSync) {
+  void updateLastRpcTime(UpdateType type) {
     lastRpcTime = new Timestamp();
-    LOG.trace("{} update last rpc time to {} {}", server.getId(),
-        lastRpcTime, inLogSync);
-    this.inLogSync = inLogSync;
+
+    final int n = type.update(outstandingOp);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{}: update lastRpcTime to {} for {}, outstandingOp={}",
+          server.getId(), lastRpcTime, type, n);
+    }
   }
 
   Timestamp getLastRpcTime() {
     return lastRpcTime;
   }
 
-  public boolean isInLogSync() {
-    return inLogSync;
+  int getOutstandingOp() {
+    return outstandingOp.get();
   }
 
   boolean shouldWithholdVotes() {
@@ -72,7 +96,7 @@ class FollowerState extends Daemon {
           break;
         }
         synchronized (server) {
-          if (!inLogSync && lastRpcTime.elapsedTimeMs() >= electionTimeout) {
+          if (outstandingOp.get() == 0 && lastRpcTime.elapsedTimeMs() >= electionTimeout) {
             LOG.info("{} changes to CANDIDATE, lastRpcTime:{}, electionTimeout:{}ms",
                 server.getId(), lastRpcTime.elapsedTimeMs(), electionTimeout);
             // election timeout, should become a candidate

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9296a3bc/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index d62b1a7..5cdc8a9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -170,7 +170,7 @@ class LeaderElection extends Daemon {
           case DISCOVERED_A_NEW_TERM:
             final long term = r.term > server.getState().getCurrentTerm() ?
                 r.term : server.getState().getCurrentTerm();
-            server.changeToFollowerAndPersistMetadata(term);
+            server.changeToFollowerAndPersistMetadata(term, Result.DISCOVERED_A_NEW_TERM);
             return;
           case TIMEOUT:
             // should start another election

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9296a3bc/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 032c3a9..1bc6e79 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -386,7 +386,7 @@ public class LeaderState {
 
   private void stepDown(long term) {
     try {
-      server.changeToFollowerAndPersistMetadata(term);
+      server.changeToFollowerAndPersistMetadata(term, "stepDown");
     } catch(IOException e) {
       final String s = server.getId() + ": Failed to persist metadata for term " + term;
       LOG.warn(s, e);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9296a3bc/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 3c42fcd..4ea78ce 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -157,9 +157,9 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     return proxy.getServerRpc();
   }
 
-  private void setRole(RaftPeerRole newRole, String op) {
+  private void setRole(RaftPeerRole newRole, Object reason) {
     LOG.info("{} changes role from {} to {} at term {} for {}",
-        getId(), this.role, newRole, state.getCurrentTerm(), op);
+        getId(), this.role, newRole, state.getCurrentTerm(), reason);
     this.role.transitionRole(newRole);
   }
 
@@ -286,29 +286,31 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
   }
 
   /**
-   * Change the server state to Follower if necessary
+   * Change the server state to Follower if this server is in a different role or force is true.
    * @param newTerm The new term.
+   * @param force Force to start a new {@link FollowerState} even if this server is already a follower.
    * @return if the term/votedFor should be updated to the new term
-   * @throws IOException if term/votedFor persistence failed.
    */
-  private synchronized boolean changeToFollower(long newTerm) {
+  private synchronized boolean changeToFollower(long newTerm, boolean force, Object reason) {
     final RaftPeerRole old = role.getCurrentRole();
     final boolean metadataUpdated = state.updateCurrentTerm(newTerm);
 
-    if (old != RaftPeerRole.FOLLOWER) {
-      setRole(RaftPeerRole.FOLLOWER, "changeToFollower");
+    if (old != RaftPeerRole.FOLLOWER || force) {
+      setRole(RaftPeerRole.FOLLOWER, reason);
       if (old == RaftPeerRole.LEADER) {
         role.shutdownLeaderState(false);
       } else if (old == RaftPeerRole.CANDIDATE) {
         role.shutdownLeaderElection();
+      } else if (old == RaftPeerRole.FOLLOWER) {
+        role.shutdownFollowerState();
       }
       role.startFollowerState(this);
     }
     return metadataUpdated;
   }
 
-  synchronized void changeToFollowerAndPersistMetadata(long newTerm) throws IOException {
-    if (changeToFollower(newTerm)) {
+  synchronized void changeToFollowerAndPersistMetadata(long newTerm, Object reason) throws IOException {
+    if (changeToFollower(newTerm, false, reason)) {
       state.persistMetadata();
     }
   }
@@ -368,7 +370,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
             getRaftConf().getPeer(state.getLeaderId()), fs.getLastRpcTime().elapsedTimeMs());
         roleInfo.setFollowerInfo(FollowerInfoProto.newBuilder()
             .setLeaderInfo(leaderInfo)
-            .setInLogSync(fs.isInLogSync()));
+            .setOutstandingOp(fs.getOutstandingOp()));
       });
       break;
 
@@ -734,10 +736,10 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
             getId(), role, candidateId, candidateTerm, state.getLeaderId(), state.getCurrentTerm(),
             fs != null? fs.getLastRpcTime().elapsedTimeMs() + "ms": null);
       } else if (state.recognizeCandidate(candidateId, candidateTerm)) {
-        final boolean termUpdated = changeToFollower(candidateTerm);
+        final boolean termUpdated = changeToFollower(candidateTerm, true, "recognizeCandidate:" + candidateId);
         // see Section 5.4.1 Election restriction
         if (state.isLogUpToDate(candidateLastEntry) && fs != null) {
-          fs.updateLastRpcTime(false);
+          fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE);
           state.grantVote(candidateId);
           voteGranted = true;
         }
@@ -806,10 +808,17 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
         .toArray(new LogEntryProto[r.getEntriesCount()]);
     final TermIndex previous = r.hasPreviousLog() ?
         ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null;
-    return appendEntriesAsync(RaftPeerId.valueOf(request.getRequestorId()),
-        ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(),
-        previous, r.getLeaderCommit(), request.getCallId(), r.getInitializing(),
-        r.getCommitInfosList(), entries);
+    final RaftPeerId requestorId = RaftPeerId.valueOf(request.getRequestorId());
+
+    preAppendEntriesAsync(requestorId, ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(),
+        previous, r.getLeaderCommit(), r.getInitializing(), entries);
+    try {
+      return appendEntriesAsync(requestorId, r.getLeaderTerm(), previous, r.getLeaderCommit(),
+          request.getCallId(), r.getInitializing(), r.getCommitInfosList(), entries);
+    } catch(Throwable t) {
+      LOG.error(getId() + ": Failed appendEntriesAsync " + r, t);
+      throw t;
+    }
   }
 
   static void logAppendEntries(boolean isHeartbeat, Supplier<String> message) {
@@ -824,24 +833,20 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     }
   }
 
-  private void updateLastRpcTime(boolean inLogSync) {
-    if (lifeCycle.getCurrentState() == RUNNING) {
-      role.getFollowerState().ifPresent(fs -> fs.updateLastRpcTime(inLogSync));
+  private Optional<FollowerState> updateLastRpcTime(FollowerState.UpdateType updateType) {
+    final Optional<FollowerState> fs = role.getFollowerState();
+    if (fs.isPresent() && lifeCycle.getCurrentState() == RUNNING) {
+      fs.get().updateLastRpcTime(updateType);
+      return fs;
+    } else {
+      return Optional.empty();
     }
   }
 
-  private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
-      RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm,
-      TermIndex previous, long leaderCommit, long callId, boolean initializing,
-      List<CommitInfoProto> commitInfos, LogEntryProto... entries) throws IOException {
+  private void preAppendEntriesAsync(RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm,
+      TermIndex previous, long leaderCommit, boolean initializing, LogEntryProto... entries) throws IOException {
     CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
         leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
-    final boolean isHeartbeat = entries.length == 0;
-    logAppendEntries(isHeartbeat,
-        () -> getId() + ": receive appendEntries(" + leaderId + ", " + leaderGroupId + ", "
-            + leaderTerm + ", " + previous + ", " + leaderCommit + ", " + initializing
-            + ", commits" + ProtoUtils.toString(commitInfos)
-            + ", entries: " + ServerProtoUtils.toString(entries));
 
     final LifeCycle.State currentState = assertLifeCycleState(STARTING, RUNNING);
     if (currentState == STARTING) {
@@ -856,12 +861,23 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     } catch (IllegalArgumentException e) {
       throw new IOException(e);
     }
+  }
 
+  private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
+      RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, long callId, boolean initializing,
+      List<CommitInfoProto> commitInfos, LogEntryProto... entries) {
+    final boolean isHeartbeat = entries.length == 0;
+    logAppendEntries(isHeartbeat,
+        () -> getId() + ": receive appendEntries(" + leaderId + ", " + leaderTerm + ", "
+            + previous + ", " + leaderCommit + ", " + initializing
+            + ", commits" + ProtoUtils.toString(commitInfos)
+            + ", entries: " + ServerProtoUtils.toString(entries));
     final List<CompletableFuture<Long>> futures;
 
     final long currentTerm;
     final long nextIndex = state.getLog().getNextIndex();
     final long followerCommit = state.getLog().getLastCommittedIndex();
+    final Optional<FollowerState> followerState;
     synchronized (this) {
       final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
       currentTerm = state.getCurrentTerm();
@@ -874,13 +890,17 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
         }
         return CompletableFuture.completedFuture(reply);
       }
-      changeToFollowerAndPersistMetadata(leaderTerm);
+      try {
+        changeToFollowerAndPersistMetadata(leaderTerm, "appendEntries");
+      } catch (IOException e) {
+        return JavaUtils.completeExceptionally(e);
+      }
       state.setLeader(leaderId, "appendEntries");
 
       if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) {
         role.startFollowerState(this);
       }
-      updateLastRpcTime(true);
+      followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START);
 
       // We need to check if "previous" is in the local peer. Note that it is
       // possible that "previous" is covered by the latest snapshot: e.g.,
@@ -896,6 +916,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
           LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}",
               getId(), previous, ServerProtoUtils.toString(reply));
         }
+        followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
         return CompletableFuture.completedFuture(reply);
       }
 
@@ -908,14 +929,11 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     if (!isHeartbeat) {
       CodeInjectionForTesting.execute(RaftLog.LOG_SYNC, getId(), null);
     }
-    return JavaUtils.allOf(futures).thenApplyAsync(v -> {
+    return JavaUtils.allOf(futures).whenCompleteAsync(
+        (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE))
+    ).thenApply(v -> {
       final AppendEntriesReplyProto reply;
       synchronized(this) {
-        if (lifeCycle.getCurrentState() == RUNNING && isFollower()
-            && getState().getCurrentTerm() == currentTerm) {
-          // reset election timer to avoid punishing the leader for our own long disk writes
-          updateLastRpcTime(false);
-        }
         state.updateStatemachine(leaderCommit, currentTerm);
         final long n = isHeartbeat? state.getLog().getNextIndex(): entries[entries.length - 1].getIndex() + 1;
         reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), groupId, currentTerm,
@@ -957,6 +975,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex(
         request.getTermIndex());
     final long lastIncludedIndex = lastTermIndex.getIndex();
+    final Optional<FollowerState> followerState;
     synchronized (this) {
       final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
       currentTerm = state.getCurrentTerm();
@@ -968,28 +987,30 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
             " Reply: {}", getId(), reply);
         return reply;
       }
-      changeToFollowerAndPersistMetadata(leaderTerm);
+      changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
       state.setLeader(leaderId, "installSnapshot");
 
-      updateLastRpcTime(true);
-
-      // Check and append the snapshot chunk. We simply put this in lock
-      // considering a follower peer requiring a snapshot installation does not
-      // have a lot of requests
-      Preconditions.assertTrue(
-          state.getLog().getNextIndex() <= lastIncludedIndex,
-          "%s log's next id is %s, last included index in snapshot is %s",
-          getId(),  state.getLog().getNextIndex(), lastIncludedIndex);
-
-      //TODO: We should only update State with installed snapshot once the request is done.
-      state.installSnapshot(request);
-
-      // update the committed index
-      // re-load the state machine if this is the last chunk
-      if (request.getDone()) {
-        state.reloadStateMachine(lastIncludedIndex, leaderTerm);
+      followerState = updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
+      try {
+        // Check and append the snapshot chunk. We simply put this in lock
+        // considering a follower peer requiring a snapshot installation does not
+        // have a lot of requests
+        Preconditions.assertTrue(
+            state.getLog().getNextIndex() <= lastIncludedIndex,
+            "%s log's next id is %s, last included index in snapshot is %s",
+            getId(), state.getLog().getNextIndex(), lastIncludedIndex);
+
+        //TODO: We should only update State with installed snapshot once the request is done.
+        state.installSnapshot(request);
+
+        // update the committed index
+        // re-load the state machine if this is the last chunk
+        if (request.getDone()) {
+          state.reloadStateMachine(lastIncludedIndex, leaderTerm);
+        }
+      } finally {
+        updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE);
       }
-      updateLastRpcTime(false);
     }
     if (request.getDone()) {
       LOG.info("{}: successfully install the whole snapshot-{}", getId(),