You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2021/04/19 10:31:43 UTC

[ratis] branch master updated: RATIS-1356. NotifyInstallSnapshot during SetConfiguration has leader … (#460)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 80f6e4b  RATIS-1356. NotifyInstallSnapshot during SetConfiguration has leader … (#460)
80f6e4b is described below

commit 80f6e4b3825860bf50f1253a4ee2cc5240bd194b
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Mon Apr 19 16:01:37 2021 +0530

    RATIS-1356. NotifyInstallSnapshot during SetConfiguration has leader … (#460)
---
 .../apache/ratis/server/impl/RaftServerImpl.java   | 33 ++++++++++++++++++++--
 .../ratis/InstallSnapshotNotificationTests.java    |  6 ++--
 .../org/apache/ratis/RaftExceptionBaseTest.java    |  3 +-
 .../apache/ratis/server/impl/MiniRaftCluster.java  | 33 ++++++++++++++++------
 .../server/impl/RaftReconfigurationBaseTest.java   |  9 ++----
 .../ratis/statemachine/RaftSnapshotBaseTest.java   |  2 +-
 6 files changed, 64 insertions(+), 22 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 01046f9..ccead21 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -512,6 +512,21 @@ class RaftServerImpl implements RaftServer.Division,
         getGroup(), getRoleInfoProto(), state.getStorage().getStorageDir().isHealthy());
   }
 
+  private RoleInfoProto getRoleInfoProto(RaftPeer leaderPeerInfo) {
+    RaftPeerRole currentRole = role.getCurrentRole();
+    RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder()
+        .setSelf(getPeer().getRaftPeerProto())
+        .setRole(currentRole)
+        .setRoleElapsedTimeMs(role.getRoleElapsedTimeMs());
+    final Optional<FollowerState> fs = role.getFollowerState();
+    final ServerRpcProto leaderInfo =
+        ServerProtoUtils.toServerRpcProto(leaderPeerInfo,
+            fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L));
+    roleInfo.setFollowerInfo(FollowerInfoProto.newBuilder().setLeaderInfo(leaderInfo)
+        .setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0)));
+    return roleInfo.build();
+  }
+
   RoleInfoProto getRoleInfoProto() {
     RaftPeerRole currentRole = role.getCurrentRole();
     RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder()
@@ -1545,13 +1560,27 @@ class RaftServerImpl implements RaftServer.Division,
           return reply;
         }
 
+        Optional<RaftPeerProto> leaderPeerInfo = null;
+        if (request.hasLastRaftConfigurationLogEntryProto()) {
+          List<RaftPeerProto> peerList = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry()
+              .getPeersList();
+          leaderPeerInfo = peerList.stream().filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)).findFirst();
+          Preconditions.assertTrue(leaderPeerInfo.isPresent());
+        }
+
+        // For the cases where RaftConf is empty on newly started peer with
+        // empty peer list, we retrieve leader info from
+        // installSnapShotRequestProto.
+        RoleInfoProto roleInfoProto =
+            getRaftConf().getPeer(state.getLeaderId()) == null ?
+                getRoleInfoProto(ProtoUtils.toRaftPeer(leaderPeerInfo.get())) :
+                getRoleInfoProto();
         // This is the first installSnapshot notify request for this term and
         // index. Notify the state machine to install the snapshot.
         LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.",
             getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex);
-
         try {
-          stateMachine.followerEvent().notifyInstallSnapshotFromLeader(getRoleInfoProto(), firstAvailableLogTermIndex)
+          stateMachine.followerEvent().notifyInstallSnapshotFromLeader(roleInfoProto, firstAvailableLogTermIndex)
               .whenComplete((reply, exception) -> {
                 if (exception != null) {
                   LOG.warn("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}",
diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index c690ea1..675ee55 100644
--- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -175,7 +175,8 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
       Assert.assertTrue(set);
 
       // add two more peers
-      final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true);
+      final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
+          true);
       // trigger setConfiguration
       cluster.setConfiguration(change.allPeersInNewConf);
 
@@ -317,7 +318,8 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
       }
 
       // Add two more peers who will need snapshots from the leader.
-      final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true);
+      final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
+          true);
       // trigger setConfiguration
       cluster.setConfiguration(change.allPeersInNewConf);
       RaftServerTestUtil
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 5a51997..09b057b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -109,7 +109,8 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
       final RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, oldLeader);
 
       // add two more peers
-      MiniRaftCluster.PeerChanges change = cluster.addNewPeers(new String[]{"ss1", "ss2"}, true);
+      MiniRaftCluster.PeerChanges change = cluster.addNewPeers(new String[]{
+          "ss1", "ss2"}, true, false);
       // trigger setConfiguration
       LOG.info("Start changing the configuration: {}", Arrays.asList(change.allPeersInNewConf));
       try (final RaftClient c2 = cluster.createClient(newLeader)) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 2d2c219..955bb42 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -283,7 +283,8 @@ public abstract class MiniRaftCluster implements Closeable {
   public MiniRaftCluster initServers() {
     LOG.info("servers = " + servers);
     if (servers.isEmpty()) {
-      putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId), true);
+      putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId),
+          true, false);
     }
     return this;
   }
@@ -296,10 +297,18 @@ public abstract class MiniRaftCluster implements Closeable {
   }
 
   private Collection<RaftServer> putNewServers(
-      Iterable<RaftPeerId> peers, boolean format) {
-    return StreamSupport.stream(peers.spliterator(), false)
-        .map(id -> putNewServer(id, group, format))
-        .collect(Collectors.toList());
+      Iterable<RaftPeerId> peers, boolean format, boolean emptyPeer) {
+    if (emptyPeer) {
+      RaftGroup raftGroup = RaftGroup.valueOf(group.getGroupId(),
+          Collections.EMPTY_LIST);
+      return StreamSupport.stream(peers.spliterator(), false)
+          .map(id -> putNewServer(id, raftGroup, format))
+          .collect(Collectors.toList());
+    } else {
+      return StreamSupport.stream(peers.spliterator(), false)
+          .map(id -> putNewServer(id, group, format))
+          .collect(Collectors.toList());
+    }
   }
 
   public void start() throws IOException {
@@ -337,7 +346,7 @@ public abstract class MiniRaftCluster implements Closeable {
 
     List<RaftPeerId> idList = new ArrayList<>(servers.keySet());
     servers.clear();
-    putNewServers(idList, format);
+    putNewServers(idList, format, false);
     start();
   }
 
@@ -406,15 +415,21 @@ public abstract class MiniRaftCluster implements Closeable {
 
   public PeerChanges addNewPeers(int number, boolean startNewPeer)
       throws IOException {
-    return addNewPeers(generateIds(number, servers.size()), startNewPeer);
+    return addNewPeers(generateIds(number, servers.size()), startNewPeer, false);
+  }
+
+  public PeerChanges addNewPeers(int number, boolean startNewPeer,
+      boolean emptyPeer) throws IOException {
+    return addNewPeers(generateIds(number, servers.size()), startNewPeer, emptyPeer);
   }
 
-  public PeerChanges addNewPeers(String[] ids, boolean startNewPeer) throws IOException {
+  public PeerChanges addNewPeers(String[] ids, boolean startNewPeer,
+      boolean emptyPeer) throws IOException {
     LOG.info("Add new peers {}", Arrays.asList(ids));
 
     // create and add new RaftServers
     final Collection<RaftServer> newServers = putNewServers(
-        CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf), true);
+        CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf), true, emptyPeer);
 
     startServers(newServers);
     if (!startNewPeer) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 6e75224..22b60da 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -361,7 +361,8 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
         }
         FIVE_SECONDS.sleep();
         LOG.info(cluster.printServers());
-        assertSuccess(success);
+
+        RaftTestUtil.waitFor(() -> success.get(), 300, 15000);
 
         final RaftLog leaderLog = cluster.getLeader().getRaftLog();
         for (RaftPeer newPeer : c1.newPeers) {
@@ -452,12 +453,6 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
     }
   }
 
-  static void assertSuccess(final AtomicReference<Boolean> success) {
-    final String s = "success=" + success;
-    Assert.assertNotNull(s, success.get());
-    Assert.assertTrue(s, success.get());
-  }
-
   /**
    * When a request's new configuration is the same with the current one, make
    * sure we return success immediately and no log entry is recorded.
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 714ff68..9837fe3 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -230,7 +230,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
       // add two more peers
       String[] newPeers = new String[]{"s3", "s4"};
       MiniRaftCluster.PeerChanges change = cluster.addNewPeers(
-          newPeers, true);
+          newPeers, true, false);
       // trigger setConfiguration
       cluster.setConfiguration(change.allPeersInNewConf);