You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/08/16 13:18:30 UTC
[ratis] 01/12: RATIS-1640. Add unit-test of listener related to setConfiguration and takeSnapshot (#697)
This is an automated email from the ASF dual-hosted git repository.
dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git
commit ce5357a2eac6aee4e824649776e916b6dbd0fe22
Author: Yaolong Liu <ly...@163.com>
AuthorDate: Wed Jul 27 02:09:05 2022 +0800
RATIS-1640. Add unit-test of listener related to setConfiguration and takeSnapshot (#697)
(cherry picked from commit d3a0f9491f17462555c8fe522cbdc2ea4c88ef3b)
---
.../ratis/server/impl/LeaderElectionTests.java | 88 ++++++++++++++++++++++
.../apache/ratis/server/impl/MiniRaftCluster.java | 6 ++
.../ratis/statemachine/SnapshotManagementTest.java | 30 ++++++++
3 files changed, 124 insertions(+)
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index b988d3f4f..6b5d04b24 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -23,6 +23,7 @@ import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
@@ -46,12 +47,16 @@ import org.junit.Test;
import org.slf4j.Logger;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LAST_LEADER_ELECTION_ELAPSED_TIME;
@@ -312,6 +317,89 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
cluster.setBlockRequestsFrom(id.toString(), false);
}
+ @Test
+ public void testAddListener() throws Exception {
+ try (final MiniRaftCluster cluster = newCluster(3)) {
+ cluster.start();
+ final RaftServer.Division leader = waitForLeader(cluster);
+ try (RaftClient client = cluster.createClient(leader.getId())) {
+ client.io().send(new RaftTestUtil.SimpleMessage("message"));
+ List<RaftPeer> servers = cluster.getPeers();
+ Assert.assertEquals(servers.size(), 3);
+ MiniRaftCluster.PeerChanges changes = cluster.addNewPeers(1,
+ true, false, RaftProtos.RaftPeerRole.LISTENER);
+ RaftClientReply reply = client.admin().setConfiguration(servers, Arrays.asList(changes.newPeers));
+ Assert.assertTrue(reply.isSuccess());
+ Collection<RaftPeer> listener =
+ leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER);
+ Assert.assertEquals(1, listener.size());
+ Assert.assertEquals(changes.newPeers[0].getId(), new ArrayList<>(listener).get(0).getId());
+ }
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testRemoveListener() throws Exception {
+ try(final MiniRaftCluster cluster = newCluster(3,1)) {
+ cluster.start();
+ final RaftServer.Division leader = waitForLeader(cluster);
+ try (RaftClient client = cluster.createClient(leader.getId())) {
+ client.io().send(new RaftTestUtil.SimpleMessage("message"));
+ Assert.assertEquals(1, cluster.getListeners().size());
+ List<RaftPeer> servers = cluster.getFollowers().stream().map(RaftServer.Division::getPeer).collect(
+ Collectors.toList());
+ servers.add(leader.getPeer());
+ RaftClientReply reply = client.admin().setConfiguration(servers);
+ Assert.assertTrue(reply.isSuccess());
+ Assert.assertEquals(0, leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER).size());
+ }
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testChangeFollowerToListener() throws Exception {
+ try(final MiniRaftCluster cluster = newCluster(3)) {
+ cluster.start();
+ final RaftServer.Division leader = waitForLeader(cluster);
+ try (RaftClient client = cluster.createClient()) {
+ client.io().send(new RaftTestUtil.SimpleMessage("message"));
+ List<RaftPeer> followers = cluster.getFollowers().stream().map(
+ RaftServer.Division::getPeer).collect(Collectors.toList());
+ Assert.assertEquals(2, followers.size());
+ List<RaftPeer> listeners = new ArrayList<>();
+ listeners.add(followers.get(1));
+ followers.remove(1);
+ RaftClientReply reply = client.admin().setConfiguration(followers, listeners);
+ Assert.assertTrue(reply.isSuccess());
+ Collection<RaftPeer> peer = leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER);
+ Assert.assertEquals(1, peer.size());
+ Assert.assertEquals(listeners.get(0).getId(), new ArrayList<>(peer).get(0).getId());
+ }
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testChangeListenerToFollower() throws Exception {
+ try(final MiniRaftCluster cluster = newCluster(2, 1)) {
+ cluster.start();
+ final RaftServer.Division leader = waitForLeader(cluster);
+ try (RaftClient client = cluster.createClient(leader.getId())) {
+ client.io().send(new RaftTestUtil.SimpleMessage("message"));
+ List<RaftPeer> listeners = cluster.getListeners()
+ .stream().map(RaftServer.Division::getPeer).collect(Collectors.toList());
+ Assert.assertEquals(listeners.size(), 1);
+ RaftClientReply reply = client.admin().setConfiguration(cluster.getPeers());
+ Assert.assertTrue(reply.isSuccess());
+ Collection<RaftPeer> peer = leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER);
+ Assert.assertEquals(0, peer.size());
+ }
+ cluster.shutdown();
+ }
+ }
+
@Test
public void testLeaderElectionMetrics() throws IOException, InterruptedException {
Timestamp timestamp = Timestamp.currentTime();
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index f5cd38b36..1f4047524 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -656,6 +656,12 @@ public abstract class MiniRaftCluster implements Closeable {
.collect(Collectors.toList());
}
+ public List<RaftServer.Division> getListeners() {
+ return getServerAliveStream()
+ .filter(server -> server.getInfo().isListener())
+ .collect(Collectors.toList());
+ }
+
public int getNumServers() {
return servers.size();
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
index 608407786..c821f36c4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
@@ -143,4 +143,34 @@ public abstract class SnapshotManagementTest<CLUSTER extends MiniRaftCluster>
.getStateMachineStorage().getSnapshotFile(follower.getInfo().getCurrentTerm(), snapshotIndex);
Assert.assertTrue(snapshotFile.exists());
}
+
+
+ @Test
+ public void testReceiveLogAndTakeSnapshotOnListener() throws Exception {
+ runWithNewCluster(2, 1, this::runTestReceiveLogAndTakeSnapshotOnListener);
+ }
+
+ void runTestReceiveLogAndTakeSnapshotOnListener(CLUSTER cluster) throws Exception {
+ final RaftClientReply snapshotReply;
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+ final RaftServer.Division listener = cluster.getListeners().get(0);
+ final RaftPeerId listenerId = listener.getId();
+ Assert.assertTrue(listener.getInfo().isListener());
+ try (final RaftClient client = cluster.createClient(listenerId)) {
+ for (int i = 0; i < RaftServerConfigKeys.Snapshot.creationGap(getProperties()); i++) {
+ RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
+ Assert.assertTrue(reply.isSuccess());
+ }
+ snapshotReply = client.getSnapshotManagementApi(listenerId).create(3000);
+ }
+
+ Assert.assertTrue(snapshotReply.isSuccess());
+ final long snapshotIndex = snapshotReply.getLogIndex();
+ LOG.info("snapshotIndex = {} on {} server {}",
+ snapshotIndex, listener.getInfo().getCurrentRole(), listener.getId());
+
+ final File snapshotFile = SimpleStateMachine4Testing.get(listener)
+ .getStateMachineStorage().getSnapshotFile(listener.getInfo().getCurrentTerm(), snapshotIndex);
+ Assert.assertTrue(snapshotFile.exists());
+ }
}