You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/08/16 13:18:30 UTC

[ratis] 01/12: RATIS-1640. Add unit-test of listener related to setConfiguration and takeSnapshot (#697)

This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit ce5357a2eac6aee4e824649776e916b6dbd0fe22
Author: Yaolong Liu <ly...@163.com>
AuthorDate: Wed Jul 27 02:09:05 2022 +0800

    RATIS-1640. Add unit-test of listener related to setConfiguration and takeSnapshot (#697)
    
    
    (cherry picked from commit d3a0f9491f17462555c8fe522cbdc2ea4c88ef3b)
---
 .../ratis/server/impl/LeaderElectionTests.java     | 88 ++++++++++++++++++++++
 .../apache/ratis/server/impl/MiniRaftCluster.java  |  6 ++
 .../ratis/statemachine/SnapshotManagementTest.java | 30 ++++++++
 3 files changed, 124 insertions(+)

diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index b988d3f4f..6b5d04b24 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -23,6 +23,7 @@ import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftGroupMemberId;
@@ -46,12 +47,16 @@ import org.junit.Test;
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
 import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LAST_LEADER_ELECTION_ELAPSED_TIME;
@@ -312,6 +317,89 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
     cluster.setBlockRequestsFrom(id.toString(), false);
   }
 
+  @Test
+  public void testAddListener() throws Exception {
+    try (final MiniRaftCluster cluster = newCluster(3)) {
+      cluster.start();
+      final RaftServer.Division leader = waitForLeader(cluster);
+      try (RaftClient client = cluster.createClient(leader.getId())) {
+        client.io().send(new RaftTestUtil.SimpleMessage("message"));
+        List<RaftPeer> servers = cluster.getPeers();
+        Assert.assertEquals(servers.size(), 3);
+        MiniRaftCluster.PeerChanges changes = cluster.addNewPeers(1,
+            true, false, RaftProtos.RaftPeerRole.LISTENER);
+        RaftClientReply reply = client.admin().setConfiguration(servers, Arrays.asList(changes.newPeers));
+        Assert.assertTrue(reply.isSuccess());
+        Collection<RaftPeer> listener =
+            leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER);
+        Assert.assertEquals(1, listener.size());
+        Assert.assertEquals(changes.newPeers[0].getId(), new ArrayList<>(listener).get(0).getId());
+      }
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testRemoveListener() throws Exception {
+    try(final MiniRaftCluster cluster = newCluster(3,1)) {
+      cluster.start();
+      final RaftServer.Division leader = waitForLeader(cluster);
+      try (RaftClient client = cluster.createClient(leader.getId())) {
+        client.io().send(new RaftTestUtil.SimpleMessage("message"));
+        Assert.assertEquals(1, cluster.getListeners().size());
+        List<RaftPeer> servers = cluster.getFollowers().stream().map(RaftServer.Division::getPeer).collect(
+            Collectors.toList());
+        servers.add(leader.getPeer());
+        RaftClientReply reply = client.admin().setConfiguration(servers);
+        Assert.assertTrue(reply.isSuccess());
+        Assert.assertEquals(0, leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER).size());
+      }
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testChangeFollowerToListener() throws Exception {
+    try(final MiniRaftCluster cluster = newCluster(3)) {
+      cluster.start();
+      final RaftServer.Division leader = waitForLeader(cluster);
+      try (RaftClient client = cluster.createClient()) {
+        client.io().send(new RaftTestUtil.SimpleMessage("message"));
+        List<RaftPeer> followers = cluster.getFollowers().stream().map(
+            RaftServer.Division::getPeer).collect(Collectors.toList());
+        Assert.assertEquals(2, followers.size());
+        List<RaftPeer> listeners = new ArrayList<>();
+        listeners.add(followers.get(1));
+        followers.remove(1);
+        RaftClientReply reply = client.admin().setConfiguration(followers, listeners);
+        Assert.assertTrue(reply.isSuccess());
+        Collection<RaftPeer> peer = leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER);
+        Assert.assertEquals(1, peer.size());
+        Assert.assertEquals(listeners.get(0).getId(), new ArrayList<>(peer).get(0).getId());
+      }
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testChangeListenerToFollower() throws Exception {
+    try(final MiniRaftCluster cluster = newCluster(2, 1)) {
+      cluster.start();
+      final RaftServer.Division leader = waitForLeader(cluster);
+      try (RaftClient client = cluster.createClient(leader.getId())) {
+        client.io().send(new RaftTestUtil.SimpleMessage("message"));
+        List<RaftPeer> listeners = cluster.getListeners()
+            .stream().map(RaftServer.Division::getPeer).collect(Collectors.toList());
+        Assert.assertEquals(listeners.size(), 1);
+        RaftClientReply reply = client.admin().setConfiguration(cluster.getPeers());
+        Assert.assertTrue(reply.isSuccess());
+        Collection<RaftPeer> peer = leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER);
+        Assert.assertEquals(0, peer.size());
+      }
+      cluster.shutdown();
+    }
+  }
+
   @Test
   public void testLeaderElectionMetrics() throws IOException, InterruptedException {
     Timestamp timestamp = Timestamp.currentTime();
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index f5cd38b36..1f4047524 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -656,6 +656,12 @@ public abstract class MiniRaftCluster implements Closeable {
         .collect(Collectors.toList());
   }
 
+  public List<RaftServer.Division> getListeners() {
+    return getServerAliveStream()
+        .filter(server -> server.getInfo().isListener())
+        .collect(Collectors.toList());
+  }
+
   public int getNumServers() {
     return servers.size();
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
index 608407786..c821f36c4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
@@ -143,4 +143,34 @@ public abstract class SnapshotManagementTest<CLUSTER extends MiniRaftCluster>
         .getStateMachineStorage().getSnapshotFile(follower.getInfo().getCurrentTerm(), snapshotIndex);
     Assert.assertTrue(snapshotFile.exists());
   }
+
+
+  @Test
+  public void testReceiveLogAndTakeSnapshotOnListener() throws Exception {
+    runWithNewCluster(2, 1, this::runTestReceiveLogAndTakeSnapshotOnListener);
+  }
+
+  void runTestReceiveLogAndTakeSnapshotOnListener(CLUSTER cluster) throws Exception {
+    final RaftClientReply snapshotReply;
+    final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+    final RaftServer.Division listener = cluster.getListeners().get(0);
+    final RaftPeerId listenerId = listener.getId();
+    Assert.assertTrue(listener.getInfo().isListener());
+    try (final RaftClient client = cluster.createClient(listenerId)) {
+      for (int i = 0; i < RaftServerConfigKeys.Snapshot.creationGap(getProperties()); i++) {
+        RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
+        Assert.assertTrue(reply.isSuccess());
+      }
+      snapshotReply = client.getSnapshotManagementApi(listenerId).create(3000);
+    }
+
+    Assert.assertTrue(snapshotReply.isSuccess());
+    final long snapshotIndex = snapshotReply.getLogIndex();
+    LOG.info("snapshotIndex = {} on {} server {}",
+        snapshotIndex, listener.getInfo().getCurrentRole(), listener.getId());
+
+    final File snapshotFile = SimpleStateMachine4Testing.get(listener)
+        .getStateMachineStorage().getSnapshotFile(listener.getInfo().getCurrentTerm(), snapshotIndex);
+    Assert.assertTrue(snapshotFile.exists());
+  }
 }