You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2019/09/17 19:43:44 UTC
[incubator-ratis] branch master updated: RATIS-655. Change
LeaderState and FollowerState and to use RaftGroupMemberId.
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c9308f6 RATIS-655. Change LeaderState and FollowerState and to use RaftGroupMemberId.
c9308f6 is described below
commit c9308f64d14c49d4edd9eb116853de7c46794ed9
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Sep 17 12:43:13 2019 -0700
RATIS-655. Change LeaderState and FollowerState and to use RaftGroupMemberId.
---
.../apache/ratis/server/impl/FollowerState.java | 22 ++++----
.../apache/ratis/server/impl/LeaderElection.java | 18 +++----
.../org/apache/ratis/server/impl/LeaderState.java | 61 ++++++++++++++--------
.../apache/ratis/server/impl/PendingRequests.java | 4 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 12 ++---
.../test/java/org/apache/ratis/RaftTestUtil.java | 4 +-
6 files changed, 68 insertions(+), 53 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index c20b1a8..d82af65 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -51,13 +51,15 @@ class FollowerState extends Daemon {
static final Logger LOG = LoggerFactory.getLogger(FollowerState.class);
+ private final String name;
private final RaftServerImpl server;
private volatile Timestamp lastRpcTime = Timestamp.currentTime();
- private volatile boolean monitorRunning = true;
+ private volatile boolean isRunning = true;
private final AtomicInteger outstandingOp = new AtomicInteger();
FollowerState(RaftServerImpl server) {
+ this.name = server.getMemberId() + "-" + getClass().getSimpleName();
this.server = server;
}
@@ -66,8 +68,7 @@ class FollowerState extends Daemon {
final int n = type.update(outstandingOp);
if (LOG.isTraceEnabled()) {
- LOG.trace("{}: update lastRpcTime to {} for {}, outstandingOp={}",
- server.getId(), lastRpcTime, type, n);
+ LOG.trace("{}: update lastRpcTime to {} for {}, outstandingOp={}", this, lastRpcTime, type, n);
}
}
@@ -84,27 +85,28 @@ class FollowerState extends Daemon {
}
void stopRunning() {
- this.monitorRunning = false;
+ this.isRunning = false;
}
@Override
public void run() {
long sleepDeviationThresholdMs = server.getSleepDeviationThresholdMs();
- while (monitorRunning && server.isFollower()) {
+ while (isRunning && server.isFollower()) {
final long electionTimeout = server.getRandomTimeoutMs();
try {
if (!JavaUtils.sleep(electionTimeout, sleepDeviationThresholdMs)) {
continue;
}
- if (!monitorRunning || !server.isFollower()) {
- LOG.info("{} heartbeat monitor quit", server.getId());
+ final boolean isFollower = server.isFollower();
+ if (!isRunning || !isFollower) {
+ LOG.info("{}: Stopping now (isRunning? {}, isFollower? {})", this, isRunning, isFollower);
break;
}
synchronized (server) {
if (outstandingOp.get() == 0 && lastRpcTime.elapsedTimeMs() >= electionTimeout) {
- LOG.info("{}:{} changes to CANDIDATE, lastRpcTime:{}, electionTimeout:{}ms",
- server.getId(), server.getGroupId(), lastRpcTime.elapsedTimeMs(), electionTimeout);
+ LOG.info("{}: change to CANDIDATE, lastRpcTime:{}ms, electionTimeout:{}ms",
+ this, lastRpcTime.elapsedTimeMs(), electionTimeout);
server.getLeaderElectionMetricsRegistry().onLeaderElectionTimeout(); // Update timeout metric counters.
// election timeout, should become a candidate
server.changeToCandidate();
@@ -123,6 +125,6 @@ class FollowerState extends Daemon {
@Override
public String toString() {
- return server.getId() + ": " + getClass().getSimpleName();
+ return name;
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 3c81348..2c77627 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -110,7 +110,7 @@ class LeaderElection implements Runnable {
private final RaftServerImpl server;
LeaderElection(RaftServerImpl server) {
- this.name = server.getMemberId() + ":" + getClass().getSimpleName() + COUNT.incrementAndGet();
+ this.name = server.getMemberId() + "-" + getClass().getSimpleName() + COUNT.incrementAndGet();
this.lifeCycle = new LifeCycle(this);
this.daemon = new Daemon(this);
this.server = server;
@@ -133,19 +133,15 @@ class LeaderElection implements Runnable {
askForVotes();
} catch(Throwable e) {
final LifeCycle.State state = lifeCycle.getCurrentState();
- final String message = "Failed " + this + ", state=" + state;
-
if (state.isClosingOrClosed()) {
- LogUtils.infoOrTrace(LOG, message, e);
LOG.info("{}: {} is safely ignored since this is already {}",
- this, e.getClass().getSimpleName(), state);
+ this, e.getClass().getSimpleName(), state, e);
} else {
if (!server.isAlive()) {
- LogUtils.infoOrTrace(LOG, message, e);
LOG.info("{}: {} is safely ignored since the server is not alive: {}",
- this, e.getClass().getSimpleName(), server);
+ this, e.getClass().getSimpleName(), server, e);
} else {
- LOG.error(message, e);
+ LOG.error("{}: Failed, state={}", this, state, e);
}
shutdown();
}
@@ -264,8 +260,10 @@ class LeaderElection implements Runnable {
final RaftPeerId replierId = RaftPeerId.valueOf(r.getServerReply().getReplyId());
final RequestVoteReplyProto previous = responses.putIfAbsent(replierId, r);
if (previous != null) {
- LOG.warn("{} received duplicated replies from {}, the 2nd reply is ignored: 1st = {}, 2nd = {}",
- server.getId(), replierId, ServerProtoUtils.toString(previous), ServerProtoUtils.toString(r));
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("{} received duplicated replies from {}, the 2nd reply is ignored: 1st={}, 2nd={}",
+ this, replierId, ServerProtoUtils.toString(previous), ServerProtoUtils.toString(r));
+ }
continue;
}
if (r.getShouldShutdown()) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 8750d29..8b8b50a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -99,13 +99,14 @@ public class LeaderState {
}
private class EventQueue {
+ private final String name = server.getMemberId() + "-" + getClass().getSimpleName();
private final BlockingQueue<StateUpdateEvent> queue = new ArrayBlockingQueue<>(4096);
void submit(StateUpdateEvent event) {
try {
queue.put(event);
} catch (InterruptedException e) {
- LOG.info("{}: Interrupted when submitting {} ", server.getId(), event);
+ LOG.info("{}: Interrupted when submitting {} ", this, event);
}
}
@@ -114,7 +115,7 @@ public class LeaderState {
try {
e = queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS);
} catch(InterruptedException ie) {
- String s = server.getId() + ": " + getClass().getSimpleName() + " thread is interrupted";
+ String s = this + ": poll() is interrupted";
if (!running) {
LOG.info(s + " gracefully");
return null;
@@ -129,6 +130,11 @@ public class LeaderState {
}
return e;
}
+
+ @Override
+ public String toString() {
+ return name;
+ }
}
/**
@@ -174,6 +180,7 @@ public class LeaderState {
private final StateUpdateEvent CHECK_STAGING_EVENT =
new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING, -1, this::checkStaging);
+ private final String name;
private final RaftServerImpl server;
private final RaftLog raftLog;
private final long currentTerm;
@@ -185,7 +192,7 @@ public class LeaderState {
* The list is protected by the RaftServer's lock.
*/
private final SenderList senders;
- private final EventQueue eventQueue = new EventQueue();
+ private final EventQueue eventQueue;
private final EventProcessor processor;
private final PendingRequests pendingRequests;
private final WatchRequests watchRequests;
@@ -197,6 +204,7 @@ public class LeaderState {
private final HeartbeatMetrics heartbeatMetrics;
LeaderState(RaftServerImpl server, RaftProperties properties) {
+ this.name = server.getMemberId() + "-" + getClass().getSimpleName();
this.server = server;
stagingCatchupGap = RaftServerConfigKeys.stagingCatchupGap(properties);
@@ -205,9 +213,11 @@ public class LeaderState {
final ServerState state = server.getState();
this.raftLog = state.getLog();
this.currentTerm = state.getCurrentTerm();
+
+ this.eventQueue = new EventQueue();
processor = new EventProcessor();
- this.pendingRequests = new PendingRequests(server.getId(), properties);
- this.watchRequests = new WatchRequests(server.getId(), properties);
+ this.pendingRequests = new PendingRequests(server.getMemberId(), properties);
+ this.watchRequests = new WatchRequests(server.getMemberId(), properties);
final RaftConfiguration conf = server.getRaftConf();
Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
@@ -249,9 +259,9 @@ public class LeaderState {
server.getStateMachine().notifyNotLeader(transactions);
watchRequests.failWatches(nle);
} catch (IOException e) {
- LOG.warn(server.getId() + ": Caught exception in sendNotLeaderResponses", e);
+ LOG.warn("{}: Caught exception in sendNotLeaderResponses", this, e);
}
- server.getServerRpc().notifyNotLeader(server.getGroupId());
+ server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId());
}
void notifySenders() {
@@ -274,6 +284,7 @@ public class LeaderState {
* Start bootstrapping new peers
*/
PendingRequest startSetConfiguration(SetConfigurationRequest request) {
+ LOG.info("{}: startSetConfiguration {}", this, request);
Preconditions.assertTrue(running && !inStagingState());
final List<RaftPeer> peersInNewConf = request.getPeersInNewConf();
@@ -303,14 +314,14 @@ public class LeaderState {
PendingRequest addPendingRequest(PendingRequests.Permit permit, RaftClientRequest request, TransactionContext entry) {
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: addPendingRequest at {}, entry=", server.getId(), request,
+ LOG.debug("{}: addPendingRequest at {}, entry={}", this, request,
ServerProtoUtils.toLogEntryString(entry.getLogEntry()));
}
return pendingRequests.add(permit, request, entry);
}
CompletableFuture<RaftClientReply> addWatchReqeust(RaftClientRequest request) {
- LOG.debug("{}: addWatchRequest {}", server.getId(), request);
+ LOG.debug("{}: addWatchRequest {}", this, request);
return watchRequests.add(request)
.thenApply(v -> new RaftClientReply(request, server.getCommitInfos()))
.exceptionally(e -> {
@@ -395,7 +406,7 @@ public class LeaderState {
void restartSender(LogAppender sender) {
final FollowerInfo follower = sender.getFollower();
- LOG.info("{}: Restarting {} for {}", server.getId(), sender.getClass().getSimpleName(), follower.getName());
+ LOG.info("{}: Restarting {} for {}", this, sender.getClass().getSimpleName(), follower.getName());
senders.removeAll(Collections.singleton(sender));
addAndStartSenders(Collections.singleton(follower.getPeer()));
}
@@ -420,7 +431,7 @@ public class LeaderState {
try {
server.changeToFollowerAndPersistMetadata(term, "stepDown");
} catch(IOException e) {
- final String s = server.getId() + ": Failed to persist metadata for term " + term;
+ final String s = this + ": Failed to persist metadata for term " + term;
LOG.warn(s, e);
// the failure should happen while changing the state to follower
// thus the in-memory state should have been updated
@@ -484,8 +495,7 @@ public class LeaderState {
final Timestamp progressTime = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
final Timestamp timeoutTime = Timestamp.currentTime().addTimeMs(-3*server.getMaxTimeoutMs());
if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) {
- LOG.debug("{} detects a follower {} timeout for bootstrapping," +
- " timeoutTime: {}", server.getId(), follower, timeoutTime);
+ LOG.debug("{} detects a follower {} timeout ({}) for bootstrapping", this, follower, timeoutTime);
return BootStrapProgress.NOPROGRESS;
} else if (follower.getMatchIndex() + stagingCatchupGap > committed
&& follower.getLastRpcResponseTime().compareTo(progressTime) > 0) {
@@ -516,8 +526,7 @@ public class LeaderState {
.getLastCommittedIndex();
Collection<BootStrapProgress> reports = checkAllProgress(committedIndex);
if (reports.contains(BootStrapProgress.NOPROGRESS)) {
- LOG.debug("{} fails the setConfiguration request", server.getId());
- stagingState.fail();
+ stagingState.fail(BootStrapProgress.NOPROGRESS);
} else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
// all caught up!
applyOldNewConf();
@@ -640,8 +649,7 @@ public class LeaderState {
pendingRequests.replySetConfiguration(server::getCommitInfos);
// if the leader is not included in the current configuration, step down
if (!conf.containsInConf(server.getId())) {
- LOG.info("{} is not included in the new configuration {}. Step down.",
- server.getId(), conf);
+ LOG.info("{} is not included in the new configuration {}. Will shutdown server...", this, conf);
try {
// leave some time for all RPC senders to send out new conf entry
Thread.sleep(server.getMinTimeoutMs());
@@ -725,6 +733,7 @@ public class LeaderState {
}
private class ConfigurationStagingState {
+ private final String name = server.getMemberId() + "-" + getClass().getSimpleName();
private final Map<RaftPeerId, RaftPeer> newPeers;
private final PeerConfiguration newConf;
@@ -755,14 +764,19 @@ public class LeaderState {
return newPeers.containsKey(peerId);
}
- void fail() {
+ void fail(BootStrapProgress progress) {
+ final String message = this + ": Fail to set configuration " + newConf + " due to " + progress;
+ LOG.debug(message);
stopAndRemoveSenders(s -> !s.getFollower().isAttendingVote());
LeaderState.this.stagingState = null;
// send back failure response to client's request
- pendingRequests.failSetConfiguration(
- new ReconfigurationTimeoutException("Fail to set configuration "
- + newConf + ". Timeout when bootstrapping new peers."));
+ pendingRequests.failSetConfiguration(new ReconfigurationTimeoutException(message));
+ }
+
+ @Override
+ public String toString() {
+ return name;
}
}
@@ -787,4 +801,9 @@ public class LeaderState {
void recordFollowerHeartbeatElapsedTime(String followerId, long elapsedTime) {
heartbeatMetrics.recordFollowerHeartbeatElapsedTime(followerId, elapsedTime);
}
+
+ @Override
+ public String toString() {
+ return name;
+ }
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 46f0c1f..7fa03b8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -24,7 +24,7 @@ import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftException;
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.TransactionContext;
@@ -135,7 +135,7 @@ class PendingRequests {
private final String name;
private final RequestMap pendingRequests;
- PendingRequests(RaftPeerId id, RaftProperties properties) {
+ PendingRequests(RaftGroupMemberId id, RaftProperties properties) {
this.name = id + "-" + getClass().getSimpleName();
this.pendingRequests = new RequestMap(id, RaftServerConfigKeys.Write.elementLimit(properties));
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 8395b06..bc4e69a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -149,10 +149,6 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
return sleepDeviationThresholdMs;
}
- public RaftGroupId getGroupId() {
- return getMemberId().getGroupId();
- }
-
public StateMachine getStateMachine() {
return stateMachine;
}
@@ -189,7 +185,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
startInitializing();
}
- registerMBean(getId(), getGroupId(), jmxAdapter, jmxAdapter);
+ registerMBean(getId(), getMemberId().getGroupId(), jmxAdapter, jmxAdapter);
state.start();
return true;
}
@@ -243,7 +239,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
}
RaftGroup getGroup() {
- return RaftGroup.valueOf(getGroupId(), getRaftConf().getPeers());
+ return RaftGroup.valueOf(getMemberId().getGroupId(), getRaftConf().getPeers());
}
public void shutdown(boolean deleteDirectory) {
@@ -474,7 +470,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
}
void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws GroupMismatchException {
- final RaftGroupId groupId = getGroupId();
+ final RaftGroupId groupId = getMemberId().getGroupId();
if (!groupId.equals(requestorGroupId)) {
throw new GroupMismatchException(getMemberId()
+ ": The group (" + requestorGroupId + ") of " + requestorId
@@ -1329,7 +1325,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
@Override
public String getGroupId() {
- return RaftServerImpl.this.getGroupId().toString();
+ return getMemberId().getGroupId().toString();
}
@Override
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 5b26419..f202207 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -180,12 +180,12 @@ public interface RaftTestUtil {
}
static void assertLogEntries(RaftServerImpl server, long expectedTerm, SimpleMessage... expectedMessages) {
- LOG.info("checking raft log for " + server.getId());
+ LOG.info("checking raft log for {}", server.getMemberId());
final RaftLog log = server.getState().getLog();
try {
RaftTestUtil.assertLogEntries(log, expectedTerm, expectedMessages);
} catch (AssertionError e) {
- LOG.error(server.getId() + ": Unexpected raft log", e);
+ LOG.error("Unexpected raft log in {}", server.getMemberId(), e);
throw e;
}
}