You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2021/04/19 10:31:43 UTC
[ratis] branch master updated: RATIS-1356. NotifyInstallSnapshot during SetConfiguration has leader … (#460)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 80f6e4b RATIS-1356. NotifyInstallSnapshot during SetConfiguration has leader … (#460)
80f6e4b is described below
commit 80f6e4b3825860bf50f1253a4ee2cc5240bd194b
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Mon Apr 19 16:01:37 2021 +0530
RATIS-1356. NotifyInstallSnapshot during SetConfiguration has leader … (#460)
---
.../apache/ratis/server/impl/RaftServerImpl.java | 33 ++++++++++++++++++++--
.../ratis/InstallSnapshotNotificationTests.java | 6 ++--
.../org/apache/ratis/RaftExceptionBaseTest.java | 3 +-
.../apache/ratis/server/impl/MiniRaftCluster.java | 33 ++++++++++++++++------
.../server/impl/RaftReconfigurationBaseTest.java | 9 ++----
.../ratis/statemachine/RaftSnapshotBaseTest.java | 2 +-
6 files changed, 64 insertions(+), 22 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 01046f9..ccead21 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -512,6 +512,21 @@ class RaftServerImpl implements RaftServer.Division,
getGroup(), getRoleInfoProto(), state.getStorage().getStorageDir().isHealthy());
}
+ private RoleInfoProto getRoleInfoProto(RaftPeer leaderPeerInfo) {
+ RaftPeerRole currentRole = role.getCurrentRole();
+ RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder()
+ .setSelf(getPeer().getRaftPeerProto())
+ .setRole(currentRole)
+ .setRoleElapsedTimeMs(role.getRoleElapsedTimeMs());
+ final Optional<FollowerState> fs = role.getFollowerState();
+ final ServerRpcProto leaderInfo =
+ ServerProtoUtils.toServerRpcProto(leaderPeerInfo,
+ fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L));
+ roleInfo.setFollowerInfo(FollowerInfoProto.newBuilder().setLeaderInfo(leaderInfo)
+ .setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0)));
+ return roleInfo.build();
+ }
+
RoleInfoProto getRoleInfoProto() {
RaftPeerRole currentRole = role.getCurrentRole();
RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder()
@@ -1545,13 +1560,27 @@ class RaftServerImpl implements RaftServer.Division,
return reply;
}
+ Optional<RaftPeerProto> leaderPeerInfo = null;
+ if (request.hasLastRaftConfigurationLogEntryProto()) {
+ List<RaftPeerProto> peerList = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry()
+ .getPeersList();
+ leaderPeerInfo = peerList.stream().filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)).findFirst();
+ Preconditions.assertTrue(leaderPeerInfo.isPresent());
+ }
+
+ // For the cases where RaftConf is empty on newly started peer with
+ // empty peer list, we retrieve leader info from
+ // installSnapShotRequestProto.
+ RoleInfoProto roleInfoProto =
+ getRaftConf().getPeer(state.getLeaderId()) == null ?
+ getRoleInfoProto(ProtoUtils.toRaftPeer(leaderPeerInfo.get())) :
+ getRoleInfoProto();
// This is the first installSnapshot notify request for this term and
// index. Notify the state machine to install the snapshot.
LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.",
getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex);
-
try {
- stateMachine.followerEvent().notifyInstallSnapshotFromLeader(getRoleInfoProto(), firstAvailableLogTermIndex)
+ stateMachine.followerEvent().notifyInstallSnapshotFromLeader(roleInfoProto, firstAvailableLogTermIndex)
.whenComplete((reply, exception) -> {
if (exception != null) {
LOG.warn("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}",
diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index c690ea1..675ee55 100644
--- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -175,7 +175,8 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
Assert.assertTrue(set);
// add two more peers
- final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true);
+ final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
+ true);
// trigger setConfiguration
cluster.setConfiguration(change.allPeersInNewConf);
@@ -317,7 +318,8 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
}
// Add two more peers who will need snapshots from the leader.
- final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true);
+ final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
+ true);
// trigger setConfiguration
cluster.setConfiguration(change.allPeersInNewConf);
RaftServerTestUtil
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 5a51997..09b057b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -109,7 +109,8 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
final RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, oldLeader);
// add two more peers
- MiniRaftCluster.PeerChanges change = cluster.addNewPeers(new String[]{"ss1", "ss2"}, true);
+ MiniRaftCluster.PeerChanges change = cluster.addNewPeers(new String[]{
+ "ss1", "ss2"}, true, false);
// trigger setConfiguration
LOG.info("Start changing the configuration: {}", Arrays.asList(change.allPeersInNewConf));
try (final RaftClient c2 = cluster.createClient(newLeader)) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 2d2c219..955bb42 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -283,7 +283,8 @@ public abstract class MiniRaftCluster implements Closeable {
public MiniRaftCluster initServers() {
LOG.info("servers = " + servers);
if (servers.isEmpty()) {
- putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId), true);
+ putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId),
+ true, false);
}
return this;
}
@@ -296,10 +297,18 @@ public abstract class MiniRaftCluster implements Closeable {
}
private Collection<RaftServer> putNewServers(
- Iterable<RaftPeerId> peers, boolean format) {
- return StreamSupport.stream(peers.spliterator(), false)
- .map(id -> putNewServer(id, group, format))
- .collect(Collectors.toList());
+ Iterable<RaftPeerId> peers, boolean format, boolean emptyPeer) {
+ if (emptyPeer) {
+ RaftGroup raftGroup = RaftGroup.valueOf(group.getGroupId(),
+ Collections.EMPTY_LIST);
+ return StreamSupport.stream(peers.spliterator(), false)
+ .map(id -> putNewServer(id, raftGroup, format))
+ .collect(Collectors.toList());
+ } else {
+ return StreamSupport.stream(peers.spliterator(), false)
+ .map(id -> putNewServer(id, group, format))
+ .collect(Collectors.toList());
+ }
}
public void start() throws IOException {
@@ -337,7 +346,7 @@ public abstract class MiniRaftCluster implements Closeable {
List<RaftPeerId> idList = new ArrayList<>(servers.keySet());
servers.clear();
- putNewServers(idList, format);
+ putNewServers(idList, format, false);
start();
}
@@ -406,15 +415,21 @@ public abstract class MiniRaftCluster implements Closeable {
public PeerChanges addNewPeers(int number, boolean startNewPeer)
throws IOException {
- return addNewPeers(generateIds(number, servers.size()), startNewPeer);
+ return addNewPeers(generateIds(number, servers.size()), startNewPeer, false);
+ }
+
+ public PeerChanges addNewPeers(int number, boolean startNewPeer,
+ boolean emptyPeer) throws IOException {
+ return addNewPeers(generateIds(number, servers.size()), startNewPeer, emptyPeer);
}
- public PeerChanges addNewPeers(String[] ids, boolean startNewPeer) throws IOException {
+ public PeerChanges addNewPeers(String[] ids, boolean startNewPeer,
+ boolean emptyPeer) throws IOException {
LOG.info("Add new peers {}", Arrays.asList(ids));
// create and add new RaftServers
final Collection<RaftServer> newServers = putNewServers(
- CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf), true);
+ CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf), true, emptyPeer);
startServers(newServers);
if (!startNewPeer) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 6e75224..22b60da 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -361,7 +361,8 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
}
FIVE_SECONDS.sleep();
LOG.info(cluster.printServers());
- assertSuccess(success);
+
+ RaftTestUtil.waitFor(() -> success.get(), 300, 15000);
final RaftLog leaderLog = cluster.getLeader().getRaftLog();
for (RaftPeer newPeer : c1.newPeers) {
@@ -452,12 +453,6 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
}
}
- static void assertSuccess(final AtomicReference<Boolean> success) {
- final String s = "success=" + success;
- Assert.assertNotNull(s, success.get());
- Assert.assertTrue(s, success.get());
- }
-
/**
* When a request's new configuration is the same with the current one, make
* sure we return success immediately and no log entry is recorded.
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 714ff68..9837fe3 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -230,7 +230,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
// add two more peers
String[] newPeers = new String[]{"s3", "s4"};
MiniRaftCluster.PeerChanges change = cluster.addNewPeers(
- newPeers, true);
+ newPeers, true, false);
// trigger setConfiguration
cluster.setConfiguration(change.allPeersInNewConf);