You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/08/13 22:08:55 UTC
incubator-ratis git commit: RATIS-290. Raft server should notify the
state machine if no leader is assigned for a long time. Contributed by Mukul
Kumar Singh
Repository: incubator-ratis
Updated Branches:
refs/heads/master 86db875aa -> 67db67efd
RATIS-290. Raft server should notify the state machine if no leader is assigned for a long time. Contributed by Mukul Kumar Singh
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/67db67ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/67db67ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/67db67ef
Branch: refs/heads/master
Commit: 67db67efd4a32b1ec3054571c0a8d885f4db882b
Parents: 86db875
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Mon Aug 13 15:08:04 2018 -0700
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Mon Aug 13 15:08:04 2018 -0700
----------------------------------------------------------------------
ratis-proto-shaded/src/main/proto/Raft.proto | 2 +-
.../ratis/server/RaftServerConfigKeys.java | 15 +++
.../apache/ratis/server/impl/LogAppender.java | 2 +-
.../ratis/server/impl/RaftServerImpl.java | 14 ++-
.../apache/ratis/server/impl/ServerState.java | 33 ++++++-
.../apache/ratis/statemachine/StateMachine.java | 16 +++-
.../TestRaftServerLeaderElectionTimeout.java | 98 ++++++++++++++++++++
.../ratis/TestRaftServerSlownessDetection.java | 2 +-
.../SimpleStateMachine4Testing.java | 14 ++-
9 files changed, 184 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index 09fd2fd..039a3f6 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -270,7 +270,7 @@ message FollowerInfoProto {
}
message CandidateInfoProto {
- // nothing to add for candidate
+ uint64 lastLeaderElapsedTimeMs = 1;
}
message RoleInfoProto {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 9e7d83a..4d5abf5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -54,6 +54,21 @@ public interface RaftServerConfigKeys {
setInt(properties::setInt, STAGING_CATCHUP_GAP_KEY, stagingCatchupGap);
}
+ /**
+ * Timeout for leader election after which the statemachine of the server is notified
+ * about leader election pending for a long time.
+ */
+ String LEADER_ELECTION_TIMEOUT_KEY = PREFIX + ".leader.election.timeout";
+ TimeDuration LEADER_ELECTION_TIMEOUT_DEFAULT = TimeDuration.valueOf(60, TimeUnit.SECONDS);
+ static TimeDuration leaderElectionTimeout(RaftProperties properties) {
+ return getTimeDuration(properties.getTimeDuration(LEADER_ELECTION_TIMEOUT_DEFAULT.getUnit()),
+ LEADER_ELECTION_TIMEOUT_KEY, LEADER_ELECTION_TIMEOUT_DEFAULT);
+ }
+ static void setLeaderElectionTimeout(RaftProperties properties, TimeDuration leaderElectionTimeout) {
+ setTimeDuration(properties::setTimeDuration, LEADER_ELECTION_TIMEOUT_KEY, leaderElectionTimeout);
+
+ }
+
interface Log {
String PREFIX = RaftServerConfigKeys.PREFIX + ".log";
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index b0d47e2..58f5525 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -499,7 +499,7 @@ public class LogAppender {
protected void checkSlowness() {
if (follower.isSlow()) {
- server.getStateMachine().notifySlowness(server.getRaftConf(), server.getRoleInfoProto());
+ server.getStateMachine().notifySlowness(server.getGroup(), server.getRoleInfoProto());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f2114b9..291c5fe 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -229,6 +229,10 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
return getState().getRaftConf();
}
+ RaftGroup getGroup() {
+ return new RaftGroup(groupId, getRaftConf().getPeers());
+ }
+
void shutdown() {
lifeCycle.checkStateAndClose(() -> {
try {
@@ -373,9 +377,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
}
ServerInformationReply getServerInformation(ServerInformationRequest request) {
- final RaftGroup group = new RaftGroup(groupId, getRaftConf().getPeers());
return new ServerInformationReply(request, getRoleInfoProto(),
- state.getStorage().getStorageDir().hasMetaFile(), getCommitInfos(), group);
+ state.getStorage().getStorageDir().hasMetaFile(), getCommitInfos(), getGroup());
}
public RoleInfoProto getRoleInfoProto() {
@@ -386,7 +389,9 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
.setRoleElapsedTimeMs(role.getRoleElapsedTimeMs());
switch (currentRole) {
case CANDIDATE:
- roleInfo.setCandidateInfo(CandidateInfoProto.getDefaultInstance());
+ CandidateInfoProto.Builder candidate = CandidateInfoProto.newBuilder()
+ .setLastLeaderElapsedTimeMs(state.getLastLeaderElapsedTimeMs());
+ roleInfo.setCandidateInfo(candidate);
break;
case FOLLOWER:
@@ -429,6 +434,9 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
Preconditions.assertTrue(isFollower());
shutdownHeartbeatMonitor();
setRole(RaftPeerRole.CANDIDATE, "changeToCandidate");
+ if (state.checkForExtendedNoLeader()) {
+ stateMachine.notifyExtendedNoLeader(getGroup(), getRoleInfoProto());
+ }
// start election
electionDaemon = new LeaderElection(this);
electionDaemon.start();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index eb67fac..c5a6a98 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -28,11 +28,14 @@ import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.Timestamp;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.apache.ratis.server.impl.RaftServerImpl.LOG;
@@ -53,6 +56,8 @@ public class ServerState implements Closeable {
/** local storage for log and snapshot */
private final RaftStorage storage;
private final SnapshotManager snapshotManager;
+ private volatile Timestamp lastNoLeaderTime;
+ private final long leaderElectionTimeoutMs;
/**
* Latest term server has seen. initialized to 0 on first boot, increases
@@ -92,7 +97,12 @@ public class ServerState implements Closeable {
long lastApplied = initStatemachine(stateMachine, group.getGroupId());
+ // On start the leader is null, start the clock now
leaderId = null;
+ this.lastNoLeaderTime = new Timestamp();
+ this.leaderElectionTimeoutMs =
+ RaftServerConfigKeys.leaderElectionTimeout(prop).toInt(TimeUnit.MILLISECONDS);
+
// we cannot apply log entries to the state machine in this step, since we
// do not know whether the local log entries have been committed.
log = initLog(id, prop, lastApplied, entry -> {
@@ -207,12 +217,31 @@ public class ServerState implements Closeable {
void setLeader(RaftPeerId newLeaderId, String op) {
if (!Objects.equals(leaderId, newLeaderId)) {
- LOG.info("{}: change Leader from {} to {} at term {} for {}",
- selfId, leaderId, newLeaderId, getCurrentTerm(), op);
+ String suffix;
+ if (newLeaderId == null) {
+ // reset the time stamp when a null leader is assigned
+ lastNoLeaderTime = new Timestamp();
+ suffix = "";
+ } else {
+ Timestamp previous = lastNoLeaderTime;
+ lastNoLeaderTime = null;
+ suffix = ", leader elected after " + previous.elapsedTimeMs() + "ms";
+ }
+ LOG.info("{}: change Leader from {} to {} at term {} for {}{}",
+ selfId, leaderId, newLeaderId, getCurrentTerm(), op, suffix);
leaderId = newLeaderId;
}
}
+ boolean checkForExtendedNoLeader() {
+ return getLastLeaderElapsedTimeMs() > leaderElectionTimeoutMs;
+ }
+
+ long getLastLeaderElapsedTimeMs() {
+ final Timestamp t = lastNoLeaderTime;
+ return t == null ? 0 : t.elapsedTimeMs();
+ }
+
void becomeLeader() {
setLeader(selfId, "becomeLeader");
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 31ca468..88b5276 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -19,6 +19,7 @@ package org.apache.ratis.statemachine;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -213,10 +214,21 @@ public interface StateMachine extends Closeable {
* Notify the Leader's state machine that one of the followers is slow
* this notification is based on "raft.server.rpc.slowness.timeout"
*
- * @param raftConfiguration raft configuration
+ * @param group raft group information
* @param roleInfoProto information about the current node role and rpc delay information
*/
- default void notifySlowness(RaftConfiguration raftConfiguration, RoleInfoProto roleInfoProto) {
+ default void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
+
+ }
+
+ /**
+ * Notify the Leader's state machine that a leader has not been elected for a long time
+ * this notification is based on "raft.server.leader.election.timeout"
+ *
+ * @param group raft group information
+ * @param roleInfoProto information about the current node role and rpc delay information
+ */
+ default void notifyExtendedNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
new file mode 100644
index 0000000..67156a1
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
+import org.apache.ratis.shaded.proto.RaftProtos;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test Raft Server Leader election timeout detection and notification to state machine.
+ */
+public class TestRaftServerLeaderElectionTimeout extends BaseTest {
+ static {
+ LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+ LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+ }
+
+ public static final int NUM_SERVERS = 3;
+
+ protected static final RaftProperties properties = new RaftProperties();
+
+ private final MiniRaftClusterWithSimulatedRpc cluster = MiniRaftClusterWithSimulatedRpc
+ .FACTORY.newCluster(NUM_SERVERS, getProperties());
+
+ public RaftProperties getProperties() {
+ RaftServerConfigKeys
+ .setLeaderElectionTimeout(properties, TimeDuration.valueOf(1, TimeUnit.SECONDS));
+ properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+ SimpleStateMachine4Testing.class, StateMachine.class);
+ return properties;
+ }
+
+ @Before
+ public void setup() {
+ Assert.assertNull(cluster.getLeader());
+ cluster.start();
+ }
+
+ @After
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testLeaderElectionDetection() throws Exception {
+ RaftTestUtil.waitForLeader(cluster);
+ long leaderElectionTimeout = RaftServerConfigKeys.
+ leaderElectionTimeout(cluster.getProperties()).toInt(TimeUnit.MILLISECONDS);
+
+ RaftServerImpl healthyFollower = cluster.getFollowers().get(1);
+ RaftServerImpl failedFollower = cluster.getFollowers().get(0);
+ // fail the leader and one of the followers to that quorum is not present
+ // for next leader election to succeed.
+ cluster.killServer(failedFollower.getId());
+ cluster.killServer(cluster.getLeader().getId());
+
+ // Wait to ensure that leader election is trigerred and also state machine callback is triggered
+ Thread.sleep( leaderElectionTimeout * 2);
+
+ RaftProtos.RoleInfoProto roleInfoProto =
+ SimpleStateMachine4Testing.get(healthyFollower).getLeaderElectionTimeoutInfo();
+ Assert.assertNotNull(roleInfoProto);
+
+ Assert.assertEquals(roleInfoProto.getRole(), RaftProtos.RaftPeerRole.CANDIDATE);
+ Assert.assertTrue(roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs() > leaderElectionTimeout);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
index 17c41a5..811e7de 100644
--- a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
+++ b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
@@ -84,7 +84,7 @@ public class TestRaftServerSlownessDetection extends BaseTest {
RaftServerImpl failedFollower = cluster.getFollowers().get(0);
// fail the node and wait for the callback to be triggered
- failedFollower.getProxy().close();
+ cluster.killServer(failedFollower.getId());
Thread.sleep( slownessTimeout * 2);
// Followers should not get any failed not notification
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 9f6efae..5554da2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -26,7 +26,7 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.protocol.TermIndex;
@@ -86,6 +86,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
private final Semaphore blockingSemaphore = new Semaphore(1);
private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX;
private RoleInfoProto slownessInfo = null;
+ private RoleInfoProto leaderElectionTimeoutInfo = null;
SimpleStateMachine4Testing() {
checkpointer = new Daemon(() -> {
@@ -106,6 +107,10 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
return slownessInfo;
}
+ public RoleInfoProto getLeaderElectionTimeoutInfo() {
+ return leaderElectionTimeoutInfo;
+ }
+
@Override
public synchronized void initialize(RaftServer server, RaftGroupId groupId,
RaftStorage raftStorage) throws IOException {
@@ -326,7 +331,12 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
}
@Override
- public void notifySlowness(RaftConfiguration raftConfiguration, RoleInfoProto roleInfoProto) {
+ public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
slownessInfo = roleInfoProto;
}
+
+ @Override
+ public void notifyExtendedNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) {
+ leaderElectionTimeoutInfo = roleInfoProto;
+ }
}