You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/08/13 22:08:55 UTC

incubator-ratis git commit: RATIS-290. Raft server should notify the state machine if no leader is assigned for a long time. Contributed by Mukul Kumar Singh

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 86db875aa -> 67db67efd


RATIS-290. Raft server should notify the state machine if no leader is assigned for a long time.  Contributed by Mukul Kumar Singh


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/67db67ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/67db67ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/67db67ef

Branch: refs/heads/master
Commit: 67db67efd4a32b1ec3054571c0a8d885f4db882b
Parents: 86db875
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Mon Aug 13 15:08:04 2018 -0700
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Mon Aug 13 15:08:04 2018 -0700

----------------------------------------------------------------------
 ratis-proto-shaded/src/main/proto/Raft.proto    |  2 +-
 .../ratis/server/RaftServerConfigKeys.java      | 15 +++
 .../apache/ratis/server/impl/LogAppender.java   |  2 +-
 .../ratis/server/impl/RaftServerImpl.java       | 14 ++-
 .../apache/ratis/server/impl/ServerState.java   | 33 ++++++-
 .../apache/ratis/statemachine/StateMachine.java | 16 +++-
 .../TestRaftServerLeaderElectionTimeout.java    | 98 ++++++++++++++++++++
 .../ratis/TestRaftServerSlownessDetection.java  |  2 +-
 .../SimpleStateMachine4Testing.java             | 14 ++-
 9 files changed, 184 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index 09fd2fd..039a3f6 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -270,7 +270,7 @@ message FollowerInfoProto {
 }
 
 message CandidateInfoProto {
-  // nothing to add for candidate
+  uint64 lastLeaderElapsedTimeMs = 1;
 }
 
 message RoleInfoProto {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 9e7d83a..4d5abf5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -54,6 +54,21 @@ public interface RaftServerConfigKeys {
     setInt(properties::setInt, STAGING_CATCHUP_GAP_KEY, stagingCatchupGap);
   }
 
+  /**
+   * Timeout for leader election after which the statemachine of the server is notified
+   * about leader election pending for a long time.
+   */
+  String LEADER_ELECTION_TIMEOUT_KEY = PREFIX + ".leader.election.timeout";
+  TimeDuration LEADER_ELECTION_TIMEOUT_DEFAULT = TimeDuration.valueOf(60, TimeUnit.SECONDS);
+  static TimeDuration leaderElectionTimeout(RaftProperties properties) {
+    return getTimeDuration(properties.getTimeDuration(LEADER_ELECTION_TIMEOUT_DEFAULT.getUnit()),
+        LEADER_ELECTION_TIMEOUT_KEY, LEADER_ELECTION_TIMEOUT_DEFAULT);
+  }
+  static void setLeaderElectionTimeout(RaftProperties properties, TimeDuration leaderElectionTimeout) {
+    setTimeDuration(properties::setTimeDuration, LEADER_ELECTION_TIMEOUT_KEY, leaderElectionTimeout);
+
+  }
+
   interface Log {
     String PREFIX = RaftServerConfigKeys.PREFIX + ".log";
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index b0d47e2..58f5525 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -499,7 +499,7 @@ public class LogAppender {
 
   protected void checkSlowness() {
     if (follower.isSlow()) {
-      server.getStateMachine().notifySlowness(server.getRaftConf(), server.getRoleInfoProto());
+      server.getStateMachine().notifySlowness(server.getGroup(), server.getRoleInfoProto());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f2114b9..291c5fe 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -229,6 +229,10 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     return getState().getRaftConf();
   }
 
+  RaftGroup getGroup() {
+    return new RaftGroup(groupId, getRaftConf().getPeers());
+  }
+
   void shutdown() {
     lifeCycle.checkStateAndClose(() -> {
       try {
@@ -373,9 +377,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
   }
 
   ServerInformationReply getServerInformation(ServerInformationRequest request) {
-    final RaftGroup group = new RaftGroup(groupId, getRaftConf().getPeers());
     return new ServerInformationReply(request, getRoleInfoProto(),
-        state.getStorage().getStorageDir().hasMetaFile(), getCommitInfos(), group);
+        state.getStorage().getStorageDir().hasMetaFile(), getCommitInfos(), getGroup());
   }
 
   public RoleInfoProto getRoleInfoProto() {
@@ -386,7 +389,9 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
         .setRoleElapsedTimeMs(role.getRoleElapsedTimeMs());
     switch (currentRole) {
     case CANDIDATE:
-      roleInfo.setCandidateInfo(CandidateInfoProto.getDefaultInstance());
+      CandidateInfoProto.Builder candidate = CandidateInfoProto.newBuilder()
+          .setLastLeaderElapsedTimeMs(state.getLastLeaderElapsedTimeMs());
+      roleInfo.setCandidateInfo(candidate);
       break;
 
     case FOLLOWER:
@@ -429,6 +434,9 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     Preconditions.assertTrue(isFollower());
     shutdownHeartbeatMonitor();
     setRole(RaftPeerRole.CANDIDATE, "changeToCandidate");
+    if (state.checkForExtendedNoLeader()) {
+      stateMachine.notifyExtendedNoLeader(getGroup(), getRoleInfoProto());
+    }
     // start election
     electionDaemon = new LeaderElection(this);
     electionDaemon.start();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index eb67fac..c5a6a98 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -28,11 +28,14 @@ import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.Timestamp;
 
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
 import static org.apache.ratis.server.impl.RaftServerImpl.LOG;
@@ -53,6 +56,8 @@ public class ServerState implements Closeable {
   /** local storage for log and snapshot */
   private final RaftStorage storage;
   private final SnapshotManager snapshotManager;
+  private volatile Timestamp lastNoLeaderTime;
+  private final long leaderElectionTimeoutMs;
 
   /**
    * Latest term server has seen. initialized to 0 on first boot, increases
@@ -92,7 +97,12 @@ public class ServerState implements Closeable {
 
     long lastApplied = initStatemachine(stateMachine, group.getGroupId());
 
+    // On start the leader is null, start the clock now
     leaderId = null;
+    this.lastNoLeaderTime = new Timestamp();
+    this.leaderElectionTimeoutMs =
+        RaftServerConfigKeys.leaderElectionTimeout(prop).toInt(TimeUnit.MILLISECONDS);
+
     // we cannot apply log entries to the state machine in this step, since we
     // do not know whether the local log entries have been committed.
     log = initLog(id, prop, lastApplied, entry -> {
@@ -207,12 +217,31 @@ public class ServerState implements Closeable {
 
   void setLeader(RaftPeerId newLeaderId, String op) {
     if (!Objects.equals(leaderId, newLeaderId)) {
-      LOG.info("{}: change Leader from {} to {} at term {} for {}",
-          selfId, leaderId, newLeaderId, getCurrentTerm(), op);
+      String suffix;
+      if (newLeaderId == null) {
+        // reset the time stamp when a null leader is assigned
+        lastNoLeaderTime = new Timestamp();
+        suffix = "";
+      } else {
+        Timestamp previous = lastNoLeaderTime;
+        lastNoLeaderTime = null;
+        suffix = ", leader elected after " + previous.elapsedTimeMs() + "ms";
+      }
+      LOG.info("{}: change Leader from {} to {} at term {} for {}{}",
+          selfId, leaderId, newLeaderId, getCurrentTerm(), op, suffix);
       leaderId = newLeaderId;
     }
   }
 
+  boolean checkForExtendedNoLeader() {
+    return getLastLeaderElapsedTimeMs() > leaderElectionTimeoutMs;
+  }
+
+  long getLastLeaderElapsedTimeMs() {
+    final Timestamp t = lastNoLeaderTime;
+    return t == null ? 0 : t.elapsedTimeMs();
+  }
+
   void becomeLeader() {
     setLeader(selfId, "becomeLeader");
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 31ca468..88b5276 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -19,6 +19,7 @@ package org.apache.ratis.statemachine;
 
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -213,10 +214,21 @@ public interface StateMachine extends Closeable {
    * Notify the Leader's state machine that one of the followers is slow
    * this notification is based on "raft.server.rpc.slowness.timeout"
    *
-   * @param raftConfiguration raft configuration
+   * @param group raft group information
    * @param roleInfoProto information about the current node role and rpc delay information
    */
-  default void notifySlowness(RaftConfiguration raftConfiguration, RoleInfoProto roleInfoProto) {
+  default void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
+
+  }
+
+  /**
+   * Notify the Leader's state machine that a leader has not been elected for a long time
+   * this notification is based on "raft.server.leader.election.timeout"
+   *
+   * @param group raft group information
+   * @param roleInfoProto information about the current node role and rpc delay information
+   */
+  default void notifyExtendedNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) {
 
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
new file mode 100644
index 0000000..67156a1
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
+import org.apache.ratis.shaded.proto.RaftProtos;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test Raft Server Leader election timeout detection and notification to state machine.
+ */
+public class TestRaftServerLeaderElectionTimeout extends BaseTest {
+  static {
+    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  public static final int NUM_SERVERS = 3;
+
+  protected static final RaftProperties properties = new RaftProperties();
+
+  private final MiniRaftClusterWithSimulatedRpc cluster = MiniRaftClusterWithSimulatedRpc
+      .FACTORY.newCluster(NUM_SERVERS, getProperties());
+
+  public RaftProperties getProperties() {
+    RaftServerConfigKeys
+        .setLeaderElectionTimeout(properties, TimeDuration.valueOf(1, TimeUnit.SECONDS));
+    properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        SimpleStateMachine4Testing.class, StateMachine.class);
+    return properties;
+  }
+
+  @Before
+  public void setup() {
+    Assert.assertNull(cluster.getLeader());
+    cluster.start();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testLeaderElectionDetection() throws Exception {
+    RaftTestUtil.waitForLeader(cluster);
+    long leaderElectionTimeout = RaftServerConfigKeys.
+        leaderElectionTimeout(cluster.getProperties()).toInt(TimeUnit.MILLISECONDS);
+
+    RaftServerImpl healthyFollower = cluster.getFollowers().get(1);
+    RaftServerImpl failedFollower = cluster.getFollowers().get(0);
+    // fail the leader and one of the followers to that quorum is not present
+    // for next leader election to succeed.
+    cluster.killServer(failedFollower.getId());
+    cluster.killServer(cluster.getLeader().getId());
+
+    // Wait to ensure that leader election is trigerred and also state machine callback is triggered
+    Thread.sleep( leaderElectionTimeout * 2);
+
+    RaftProtos.RoleInfoProto roleInfoProto =
+        SimpleStateMachine4Testing.get(healthyFollower).getLeaderElectionTimeoutInfo();
+    Assert.assertNotNull(roleInfoProto);
+
+    Assert.assertEquals(roleInfoProto.getRole(), RaftProtos.RaftPeerRole.CANDIDATE);
+    Assert.assertTrue(roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs() > leaderElectionTimeout);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
index 17c41a5..811e7de 100644
--- a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
+++ b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
@@ -84,7 +84,7 @@ public class TestRaftServerSlownessDetection extends BaseTest {
     RaftServerImpl failedFollower = cluster.getFollowers().get(0);
 
     // fail the node and wait for the callback to be triggered
-    failedFollower.getProxy().close();
+    cluster.killServer(failedFollower.getId());
     Thread.sleep( slownessTimeout * 2);
 
     // Followers should not get any failed not notification

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 9f6efae..5554da2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -26,7 +26,7 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -86,6 +86,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
   private final Semaphore blockingSemaphore = new Semaphore(1);
   private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX;
   private RoleInfoProto slownessInfo = null;
+  private RoleInfoProto leaderElectionTimeoutInfo = null;
 
   SimpleStateMachine4Testing() {
     checkpointer = new Daemon(() -> {
@@ -106,6 +107,10 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
     return slownessInfo;
   }
 
+  public RoleInfoProto getLeaderElectionTimeoutInfo() {
+    return leaderElectionTimeoutInfo;
+  }
+
   @Override
   public synchronized void initialize(RaftServer server, RaftGroupId groupId,
       RaftStorage raftStorage) throws IOException {
@@ -326,7 +331,12 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
   }
 
   @Override
-  public void notifySlowness(RaftConfiguration raftConfiguration, RoleInfoProto roleInfoProto) {
+  public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
     slownessInfo = roleInfoProto;
   }
+
+  @Override
+  public void notifyExtendedNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) {
+    leaderElectionTimeoutInfo = roleInfoProto;
+  }
 }