You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/12/03 01:26:43 UTC
incubator-ratis git commit: RATIS-443. FollowerState.inLogSync can be
incorrectly cleared in appendEntriesAsync.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 609773e03 -> 9296a3bc0
RATIS-443. FollowerState.inLogSync can be incorrectly cleared in appendEntriesAsync.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/9296a3bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/9296a3bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/9296a3bc
Branch: refs/heads/master
Commit: 9296a3bc0b79cf73a6576e0b4c0b981d1ea3b221
Parents: 609773e
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Sun Dec 2 16:57:20 2018 -0800
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Sun Dec 2 16:57:20 2018 -0800
----------------------------------------------------------------------
ratis-proto/src/main/proto/Raft.proto | 2 +-
.../apache/ratis/server/impl/FollowerState.java | 42 ++++--
.../ratis/server/impl/LeaderElection.java | 2 +-
.../apache/ratis/server/impl/LeaderState.java | 2 +-
.../ratis/server/impl/RaftServerImpl.java | 135 +++++++++++--------
5 files changed, 114 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9296a3bc/ratis-proto/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 83d4394..103c478 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -316,7 +316,7 @@ message LeaderInfoProto {
message FollowerInfoProto {
ServerRpcProto leaderInfo = 1;
- bool inLogSync = 2;
+ uint32 outstandingOp = 2;
}
message CandidateInfoProto {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9296a3bc/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index f526091..903ab5e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,35 +22,59 @@ import org.apache.ratis.util.Timestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.ToIntFunction;
+
/**
* Used when the peer is a follower. Used to track the election timeout.
*/
class FollowerState extends Daemon {
+ enum UpdateType {
+ APPEND_START(AtomicInteger::incrementAndGet),
+ APPEND_COMPLETE(AtomicInteger::decrementAndGet),
+ INSTALL_SNAPSHOT_START(AtomicInteger::incrementAndGet),
+ INSTALL_SNAPSHOT_COMPLETE(AtomicInteger::decrementAndGet),
+ REQUEST_VOTE(AtomicInteger::get);
+
+ private final ToIntFunction<AtomicInteger> updateFunction;
+
+ UpdateType(ToIntFunction<AtomicInteger> updateFunction) {
+ this.updateFunction = updateFunction;
+ }
+
+ int update(AtomicInteger outstanding) {
+ return updateFunction.applyAsInt(outstanding);
+ }
+ }
+
static final Logger LOG = LoggerFactory.getLogger(FollowerState.class);
private final RaftServerImpl server;
private volatile Timestamp lastRpcTime = new Timestamp();
private volatile boolean monitorRunning = true;
- private volatile boolean inLogSync = false;
+ private final AtomicInteger outstandingOp = new AtomicInteger();
FollowerState(RaftServerImpl server) {
this.server = server;
}
- void updateLastRpcTime(boolean inLogSync) {
+ void updateLastRpcTime(UpdateType type) {
lastRpcTime = new Timestamp();
- LOG.trace("{} update last rpc time to {} {}", server.getId(),
- lastRpcTime, inLogSync);
- this.inLogSync = inLogSync;
+
+ final int n = type.update(outstandingOp);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{}: update lastRpcTime to {} for {}, outstandingOp={}",
+ server.getId(), lastRpcTime, type, n);
+ }
}
Timestamp getLastRpcTime() {
return lastRpcTime;
}
- public boolean isInLogSync() {
- return inLogSync;
+ int getOutstandingOp() {
+ return outstandingOp.get();
}
boolean shouldWithholdVotes() {
@@ -72,7 +96,7 @@ class FollowerState extends Daemon {
break;
}
synchronized (server) {
- if (!inLogSync && lastRpcTime.elapsedTimeMs() >= electionTimeout) {
+ if (outstandingOp.get() == 0 && lastRpcTime.elapsedTimeMs() >= electionTimeout) {
LOG.info("{} changes to CANDIDATE, lastRpcTime:{}, electionTimeout:{}ms",
server.getId(), lastRpcTime.elapsedTimeMs(), electionTimeout);
// election timeout, should become a candidate
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9296a3bc/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index d62b1a7..5cdc8a9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -170,7 +170,7 @@ class LeaderElection extends Daemon {
case DISCOVERED_A_NEW_TERM:
final long term = r.term > server.getState().getCurrentTerm() ?
r.term : server.getState().getCurrentTerm();
- server.changeToFollowerAndPersistMetadata(term);
+ server.changeToFollowerAndPersistMetadata(term, Result.DISCOVERED_A_NEW_TERM);
return;
case TIMEOUT:
// should start another election
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9296a3bc/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 032c3a9..1bc6e79 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -386,7 +386,7 @@ public class LeaderState {
private void stepDown(long term) {
try {
- server.changeToFollowerAndPersistMetadata(term);
+ server.changeToFollowerAndPersistMetadata(term, "stepDown");
} catch(IOException e) {
final String s = server.getId() + ": Failed to persist metadata for term " + term;
LOG.warn(s, e);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9296a3bc/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 3c42fcd..4ea78ce 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -157,9 +157,9 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
return proxy.getServerRpc();
}
- private void setRole(RaftPeerRole newRole, String op) {
+ private void setRole(RaftPeerRole newRole, Object reason) {
LOG.info("{} changes role from {} to {} at term {} for {}",
- getId(), this.role, newRole, state.getCurrentTerm(), op);
+ getId(), this.role, newRole, state.getCurrentTerm(), reason);
this.role.transitionRole(newRole);
}
@@ -286,29 +286,31 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
}
/**
- * Change the server state to Follower if necessary
+ * Change the server state to Follower if this server is in a different role or force is true.
* @param newTerm The new term.
+ * @param force Force to start a new {@link FollowerState} even if this server is already a follower.
* @return if the term/votedFor should be updated to the new term
- * @throws IOException if term/votedFor persistence failed.
*/
- private synchronized boolean changeToFollower(long newTerm) {
+ private synchronized boolean changeToFollower(long newTerm, boolean force, Object reason) {
final RaftPeerRole old = role.getCurrentRole();
final boolean metadataUpdated = state.updateCurrentTerm(newTerm);
- if (old != RaftPeerRole.FOLLOWER) {
- setRole(RaftPeerRole.FOLLOWER, "changeToFollower");
+ if (old != RaftPeerRole.FOLLOWER || force) {
+ setRole(RaftPeerRole.FOLLOWER, reason);
if (old == RaftPeerRole.LEADER) {
role.shutdownLeaderState(false);
} else if (old == RaftPeerRole.CANDIDATE) {
role.shutdownLeaderElection();
+ } else if (old == RaftPeerRole.FOLLOWER) {
+ role.shutdownFollowerState();
}
role.startFollowerState(this);
}
return metadataUpdated;
}
- synchronized void changeToFollowerAndPersistMetadata(long newTerm) throws IOException {
- if (changeToFollower(newTerm)) {
+ synchronized void changeToFollowerAndPersistMetadata(long newTerm, Object reason) throws IOException {
+ if (changeToFollower(newTerm, false, reason)) {
state.persistMetadata();
}
}
@@ -368,7 +370,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
getRaftConf().getPeer(state.getLeaderId()), fs.getLastRpcTime().elapsedTimeMs());
roleInfo.setFollowerInfo(FollowerInfoProto.newBuilder()
.setLeaderInfo(leaderInfo)
- .setInLogSync(fs.isInLogSync()));
+ .setOutstandingOp(fs.getOutstandingOp()));
});
break;
@@ -734,10 +736,10 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
getId(), role, candidateId, candidateTerm, state.getLeaderId(), state.getCurrentTerm(),
fs != null? fs.getLastRpcTime().elapsedTimeMs() + "ms": null);
} else if (state.recognizeCandidate(candidateId, candidateTerm)) {
- final boolean termUpdated = changeToFollower(candidateTerm);
+ final boolean termUpdated = changeToFollower(candidateTerm, true, "recognizeCandidate:" + candidateId);
// see Section 5.4.1 Election restriction
if (state.isLogUpToDate(candidateLastEntry) && fs != null) {
- fs.updateLastRpcTime(false);
+ fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE);
state.grantVote(candidateId);
voteGranted = true;
}
@@ -806,10 +808,17 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
.toArray(new LogEntryProto[r.getEntriesCount()]);
final TermIndex previous = r.hasPreviousLog() ?
ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null;
- return appendEntriesAsync(RaftPeerId.valueOf(request.getRequestorId()),
- ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(),
- previous, r.getLeaderCommit(), request.getCallId(), r.getInitializing(),
- r.getCommitInfosList(), entries);
+ final RaftPeerId requestorId = RaftPeerId.valueOf(request.getRequestorId());
+
+ preAppendEntriesAsync(requestorId, ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(),
+ previous, r.getLeaderCommit(), r.getInitializing(), entries);
+ try {
+ return appendEntriesAsync(requestorId, r.getLeaderTerm(), previous, r.getLeaderCommit(),
+ request.getCallId(), r.getInitializing(), r.getCommitInfosList(), entries);
+ } catch(Throwable t) {
+ LOG.error(getId() + ": Failed appendEntriesAsync " + r, t);
+ throw t;
+ }
}
static void logAppendEntries(boolean isHeartbeat, Supplier<String> message) {
@@ -824,24 +833,20 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
}
}
- private void updateLastRpcTime(boolean inLogSync) {
- if (lifeCycle.getCurrentState() == RUNNING) {
- role.getFollowerState().ifPresent(fs -> fs.updateLastRpcTime(inLogSync));
+ private Optional<FollowerState> updateLastRpcTime(FollowerState.UpdateType updateType) {
+ final Optional<FollowerState> fs = role.getFollowerState();
+ if (fs.isPresent() && lifeCycle.getCurrentState() == RUNNING) {
+ fs.get().updateLastRpcTime(updateType);
+ return fs;
+ } else {
+ return Optional.empty();
}
}
- private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
- RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm,
- TermIndex previous, long leaderCommit, long callId, boolean initializing,
- List<CommitInfoProto> commitInfos, LogEntryProto... entries) throws IOException {
+ private void preAppendEntriesAsync(RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm,
+ TermIndex previous, long leaderCommit, boolean initializing, LogEntryProto... entries) throws IOException {
CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
- final boolean isHeartbeat = entries.length == 0;
- logAppendEntries(isHeartbeat,
- () -> getId() + ": receive appendEntries(" + leaderId + ", " + leaderGroupId + ", "
- + leaderTerm + ", " + previous + ", " + leaderCommit + ", " + initializing
- + ", commits" + ProtoUtils.toString(commitInfos)
- + ", entries: " + ServerProtoUtils.toString(entries));
final LifeCycle.State currentState = assertLifeCycleState(STARTING, RUNNING);
if (currentState == STARTING) {
@@ -856,12 +861,23 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
} catch (IllegalArgumentException e) {
throw new IOException(e);
}
+ }
+ private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
+ RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, long callId, boolean initializing,
+ List<CommitInfoProto> commitInfos, LogEntryProto... entries) {
+ final boolean isHeartbeat = entries.length == 0;
+ logAppendEntries(isHeartbeat,
+ () -> getId() + ": receive appendEntries(" + leaderId + ", " + leaderTerm + ", "
+ + previous + ", " + leaderCommit + ", " + initializing
+ + ", commits" + ProtoUtils.toString(commitInfos)
+ + ", entries: " + ServerProtoUtils.toString(entries));
final List<CompletableFuture<Long>> futures;
final long currentTerm;
final long nextIndex = state.getLog().getNextIndex();
final long followerCommit = state.getLog().getLastCommittedIndex();
+ final Optional<FollowerState> followerState;
synchronized (this) {
final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
@@ -874,13 +890,17 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
}
return CompletableFuture.completedFuture(reply);
}
- changeToFollowerAndPersistMetadata(leaderTerm);
+ try {
+ changeToFollowerAndPersistMetadata(leaderTerm, "appendEntries");
+ } catch (IOException e) {
+ return JavaUtils.completeExceptionally(e);
+ }
state.setLeader(leaderId, "appendEntries");
if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) {
role.startFollowerState(this);
}
- updateLastRpcTime(true);
+ followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START);
// We need to check if "previous" is in the local peer. Note that it is
// possible that "previous" is covered by the latest snapshot: e.g.,
@@ -896,6 +916,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}",
getId(), previous, ServerProtoUtils.toString(reply));
}
+ followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
return CompletableFuture.completedFuture(reply);
}
@@ -908,14 +929,11 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
if (!isHeartbeat) {
CodeInjectionForTesting.execute(RaftLog.LOG_SYNC, getId(), null);
}
- return JavaUtils.allOf(futures).thenApplyAsync(v -> {
+ return JavaUtils.allOf(futures).whenCompleteAsync(
+ (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE))
+ ).thenApply(v -> {
final AppendEntriesReplyProto reply;
synchronized(this) {
- if (lifeCycle.getCurrentState() == RUNNING && isFollower()
- && getState().getCurrentTerm() == currentTerm) {
- // reset election timer to avoid punishing the leader for our own long disk writes
- updateLastRpcTime(false);
- }
state.updateStatemachine(leaderCommit, currentTerm);
final long n = isHeartbeat? state.getLog().getNextIndex(): entries[entries.length - 1].getIndex() + 1;
reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), groupId, currentTerm,
@@ -957,6 +975,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex(
request.getTermIndex());
final long lastIncludedIndex = lastTermIndex.getIndex();
+ final Optional<FollowerState> followerState;
synchronized (this) {
final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
@@ -968,28 +987,30 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
" Reply: {}", getId(), reply);
return reply;
}
- changeToFollowerAndPersistMetadata(leaderTerm);
+ changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
state.setLeader(leaderId, "installSnapshot");
- updateLastRpcTime(true);
-
- // Check and append the snapshot chunk. We simply put this in lock
- // considering a follower peer requiring a snapshot installation does not
- // have a lot of requests
- Preconditions.assertTrue(
- state.getLog().getNextIndex() <= lastIncludedIndex,
- "%s log's next id is %s, last included index in snapshot is %s",
- getId(), state.getLog().getNextIndex(), lastIncludedIndex);
-
- //TODO: We should only update State with installed snapshot once the request is done.
- state.installSnapshot(request);
-
- // update the committed index
- // re-load the state machine if this is the last chunk
- if (request.getDone()) {
- state.reloadStateMachine(lastIncludedIndex, leaderTerm);
+ followerState = updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
+ try {
+ // Check and append the snapshot chunk. We simply put this in lock
+ // considering a follower peer requiring a snapshot installation does not
+ // have a lot of requests
+ Preconditions.assertTrue(
+ state.getLog().getNextIndex() <= lastIncludedIndex,
+ "%s log's next id is %s, last included index in snapshot is %s",
+ getId(), state.getLog().getNextIndex(), lastIncludedIndex);
+
+ //TODO: We should only update State with installed snapshot once the request is done.
+ state.installSnapshot(request);
+
+ // update the committed index
+ // re-load the state machine if this is the last chunk
+ if (request.getDone()) {
+ state.reloadStateMachine(lastIncludedIndex, leaderTerm);
+ }
+ } finally {
+ updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE);
}
- updateLastRpcTime(false);
}
if (request.getDone()) {
LOG.info("{}: successfully install the whole snapshot-{}", getId(),