You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/06/16 14:22:09 UTC
[ignite-3] branch main updated: IGNITE-14910 Use local address in
JRaft tests (#178)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new cb7d9f7 IGNITE-14910 Use local address in JRaft tests (#178)
cb7d9f7 is described below
commit cb7d9f7efd61af69561115c2c748755c2e1bd2f3
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Wed Jun 16 17:22:03 2021 +0300
IGNITE-14910 Use local address in JRaft tests (#178)
---
.../ignite/raft/jraft/core/ITCliServiceTest.java | 391 +++--
.../apache/ignite/raft/jraft/core/ITNodeTest.java | 1559 +++++++++-----------
.../raft/server/ITJRaftCounterServerTest.java | 58 +-
.../ignite/raft/jraft/rpc/AbstractRpcTest.java | 7 +-
.../apache/ignite/raft/jraft/test/TestUtils.java | 70 +-
5 files changed, 967 insertions(+), 1118 deletions(-)
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
index 5a01039..052f35d 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.raft.jraft.core;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
@@ -48,21 +48,20 @@ import org.apache.ignite.raft.jraft.option.CliOptions;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Utils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.lang.Thread.sleep;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
/**
* Jraft cli tests.
@@ -92,39 +91,32 @@ public class ITCliServiceTest {
private Configuration conf;
- @Rule
- public TestName testName = new TestName();
-
private static final int LEARNER_PORT_STEP = 100;
- @Before
- public void setup() throws Exception {
- LOG.info(">>>>>>>>>>>>>>> Start test method: " + this.testName.getMethodName());
- this.dataPath = TestUtils.mkTempDir();
- new File(this.dataPath).mkdirs();
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ @BeforeEach
+ public void setup(TestInfo testInfo) throws Exception {
+ LOG.info(">>>>>>>>>>>>>>> Start test method: " + testInfo.getDisplayName());
+ dataPath = TestUtils.mkTempDir();
+ new File(dataPath).mkdirs();
+ List<PeerId> peers = TestUtils.generatePeers(3);
- final LinkedHashSet<PeerId> learners = new LinkedHashSet<>();
+ LinkedHashSet<PeerId> learners = new LinkedHashSet<>();
// 2 learners
- for (int i = 0; i < 2; i++) {
- learners.add(new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + LEARNER_PORT_STEP + i));
- }
+ for (int i = 0; i < 2; i++)
+ learners.add(new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + LEARNER_PORT_STEP + i));
- this.cluster = new TestCluster(this.groupId, this.dataPath, peers, learners, 300);
- for (final PeerId peer : peers) {
- this.cluster.start(peer.getEndpoint());
- }
+ cluster = new TestCluster(groupId, dataPath, peers, learners, 300);
+ for (PeerId peer : peers)
+ cluster.start(peer.getEndpoint());
- for (final PeerId peer : learners) {
- this.cluster.startLearner(peer);
- }
+ for (PeerId peer : learners)
+ cluster.startLearner(peer);
- this.cluster.waitLeader();
+ cluster.waitLeader();
- for (Node follower : cluster.getFollowers()) {
+ for (Node follower : cluster.getFollowers())
assertTrue(waitForCondition(() -> follower.getLeaderId() != null, 3_000));
- }
for (PeerId learner : cluster.getLearners()) {
Node node = cluster.getNode(learner.getEndpoint());
@@ -132,8 +124,8 @@ public class ITCliServiceTest {
assertTrue(waitForCondition(() -> node.getLeaderId() != null, 3_000));
}
- this.cliService = new CliServiceImpl();
- this.conf = new Configuration(peers, learners);
+ cliService = new CliServiceImpl();
+ conf = new Configuration(peers, learners);
CliOptions opts = new CliOptions();
opts.setClientExecutor(JRaftUtils.createClientExecutor(opts, "client"));
@@ -144,42 +136,42 @@ public class ITCliServiceTest {
IgniteRpcClient rpcClient = new IgniteRpcClient(clientSvc, false);
opts.setRpcClient(rpcClient);
- assertTrue(this.cliService.init(opts));
+ assertTrue(cliService.init(opts));
}
- @After
- public void teardown() throws Exception {
- this.cliService.shutdown();
- this.cluster.stopAll();
- Utils.delete(new File(this.dataPath));
- LOG.info(">>>>>>>>>>>>>>> End test method: " + this.testName.getMethodName());
+ @AfterEach
+ public void teardown(TestInfo testInfo) throws Exception {
+ cliService.shutdown();
+ cluster.stopAll();
+ Utils.delete(new File(dataPath));
+ LOG.info(">>>>>>>>>>>>>>> End test method: " + testInfo.getDisplayName());
}
@Test
public void testTransferLeader() throws Exception {
- final PeerId leader = this.cluster.getLeader().getNodeId().getPeerId().copy();
+ PeerId leader = cluster.getLeader().getNodeId().getPeerId().copy();
assertNotNull(leader);
- final Set<PeerId> peers = this.conf.getPeerSet();
+ Set<PeerId> peers = conf.getPeerSet();
PeerId targetPeer = null;
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers) {
if (!peer.equals(leader)) {
targetPeer = peer;
break;
}
}
assertNotNull(targetPeer);
- assertTrue(this.cliService.transferLeader(this.groupId, this.conf, targetPeer).isOk());
- this.cluster.waitLeader();
- assertEquals(targetPeer, this.cluster.getLeader().getNodeId().getPeerId());
+ assertTrue(cliService.transferLeader(groupId, conf, targetPeer).isOk());
+ cluster.waitLeader();
+ assertEquals(targetPeer, cluster.getLeader().getNodeId().getPeerId());
}
@SuppressWarnings("SameParameterValue")
- private void sendTestTaskAndWait(final Node node, final int code) throws InterruptedException {
- final CountDownLatch latch = new CountDownLatch(10);
+ private void sendTestTaskAndWait(Node node, int code) throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
- final ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
- final Task task = new Task(data, new ExpectClosure(code, null, latch));
+ ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
+ Task task = new Task(data, new ExpectClosure(code, null, latch));
node.apply(task);
}
assertTrue(latch.await(10, TimeUnit.SECONDS));
@@ -187,214 +179,200 @@ public class ITCliServiceTest {
@Test
public void testLearnerServices() throws Exception {
- final PeerId learner3 = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + LEARNER_PORT_STEP + 3);
- assertTrue(this.cluster.startLearner(learner3));
- sendTestTaskAndWait(this.cluster.getLeader(), 0);
- Thread.sleep(500);
- for (final MockStateMachine fsm : this.cluster.getFsms()) {
- if (!fsm.getAddress().equals(learner3.getEndpoint())) {
+ PeerId learner3 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + LEARNER_PORT_STEP + 3);
+ assertTrue(cluster.startLearner(learner3));
+ sendTestTaskAndWait(cluster.getLeader(), 0);
+ sleep(500);
+ for (MockStateMachine fsm : cluster.getFsms()) {
+ if (!fsm.getAddress().equals(learner3.getEndpoint()))
assertEquals(10, fsm.getLogs().size());
- }
}
- assertEquals(0, this.cluster.getFsmByPeer(learner3).getLogs().size());
- List<PeerId> oldLearners = new ArrayList<PeerId>(this.conf.getLearners());
- assertEquals(oldLearners, this.cliService.getLearners(this.groupId, this.conf));
- assertEquals(oldLearners, this.cliService.getAliveLearners(this.groupId, this.conf));
+ assertEquals(0, cluster.getFsmByPeer(learner3).getLogs().size());
+ List<PeerId> oldLearners = new ArrayList<PeerId>(conf.getLearners());
+ assertEquals(oldLearners, cliService.getLearners(groupId, conf));
+ assertEquals(oldLearners, cliService.getAliveLearners(groupId, conf));
// Add learner3
- this.cliService.addLearners(this.groupId, this.conf, Arrays.asList(learner3));
- Thread.sleep(1000);
- assertEquals(10, this.cluster.getFsmByPeer(learner3).getLogs().size());
+ cliService.addLearners(groupId, conf, Collections.singletonList(learner3));
+ sleep(1000);
+ assertEquals(10, cluster.getFsmByPeer(learner3).getLogs().size());
- sendTestTaskAndWait(this.cluster.getLeader(), 0);
- Thread.sleep(1000);
- for (final MockStateMachine fsm : this.cluster.getFsms()) {
+ sendTestTaskAndWait(cluster.getLeader(), 0);
+ sleep(1000);
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(20, fsm.getLogs().size());
-
- }
List<PeerId> newLearners = new ArrayList<>(oldLearners);
newLearners.add(learner3);
- assertEquals(newLearners, this.cliService.getLearners(this.groupId, this.conf));
- assertEquals(newLearners, this.cliService.getAliveLearners(this.groupId, this.conf));
+ assertEquals(newLearners, cliService.getLearners(groupId, conf));
+ assertEquals(newLearners, cliService.getAliveLearners(groupId, conf));
// Remove 3
- this.cliService.removeLearners(this.groupId, this.conf, Arrays.asList(learner3));
- sendTestTaskAndWait(this.cluster.getLeader(), 0);
- Thread.sleep(1000);
- for (final MockStateMachine fsm : this.cluster.getFsms()) {
- if (!fsm.getAddress().equals(learner3.getEndpoint())) {
+ cliService.removeLearners(groupId, conf, Collections.singletonList(learner3));
+ sendTestTaskAndWait(cluster.getLeader(), 0);
+ sleep(1000);
+ for (MockStateMachine fsm : cluster.getFsms()) {
+ if (!fsm.getAddress().equals(learner3.getEndpoint()))
assertEquals(30, fsm.getLogs().size());
- }
}
// Latest 10 logs are not replicated to learner3, because it's removed.
- assertEquals(20, this.cluster.getFsmByPeer(learner3).getLogs().size());
- assertEquals(oldLearners, this.cliService.getLearners(this.groupId, this.conf));
- assertEquals(oldLearners, this.cliService.getAliveLearners(this.groupId, this.conf));
+ assertEquals(20, cluster.getFsmByPeer(learner3).getLogs().size());
+ assertEquals(oldLearners, cliService.getLearners(groupId, conf));
+ assertEquals(oldLearners, cliService.getAliveLearners(groupId, conf));
// Set learners into [learner3]
- this.cliService.resetLearners(this.groupId, this.conf, Arrays.asList(learner3));
- Thread.sleep(100);
- assertEquals(30, this.cluster.getFsmByPeer(learner3).getLogs().size());
+ cliService.resetLearners(groupId, conf, Collections.singletonList(learner3));
+ sleep(100);
+ assertEquals(30, cluster.getFsmByPeer(learner3).getLogs().size());
- sendTestTaskAndWait(this.cluster.getLeader(), 0);
- Thread.sleep(1000);
+ sendTestTaskAndWait(cluster.getLeader(), 0);
+ sleep(1000);
// Latest 10 logs are not replicated to learner1 and learner2, because they were removed by resetting learners set.
- for (final MockStateMachine fsm : this.cluster.getFsms()) {
- if (!oldLearners.contains(new PeerId(fsm.getAddress(), 0))) {
+ for (MockStateMachine fsm : cluster.getFsms()) {
+ if (!oldLearners.contains(new PeerId(fsm.getAddress(), 0)))
assertEquals(40, fsm.getLogs().size());
- }
- else {
+ else
assertEquals(30, fsm.getLogs().size());
- }
}
- assertEquals(Arrays.asList(learner3), this.cliService.getLearners(this.groupId, this.conf));
- assertEquals(Arrays.asList(learner3), this.cliService.getAliveLearners(this.groupId, this.conf));
+ assertEquals(Collections.singletonList(learner3), cliService.getLearners(groupId, conf));
+ assertEquals(Collections.singletonList(learner3), cliService.getAliveLearners(groupId, conf));
// Stop learner3
- this.cluster.stop(learner3.getEndpoint());
- Thread.sleep(1000);
- assertEquals(Arrays.asList(learner3), this.cliService.getLearners(this.groupId, this.conf));
- assertTrue(this.cliService.getAliveLearners(this.groupId, this.conf).isEmpty());
+ cluster.stop(learner3.getEndpoint());
+ sleep(1000);
+ assertEquals(Collections.singletonList(learner3), cliService.getLearners(groupId, conf));
+ assertTrue(cliService.getAliveLearners(groupId, conf).isEmpty());
}
@Test
public void testAddPeerRemovePeer() throws Exception {
- final PeerId peer3 = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + 3);
- assertTrue(this.cluster.start(peer3.getEndpoint()));
- sendTestTaskAndWait(this.cluster.getLeader(), 0);
- Thread.sleep(100);
- assertEquals(0, this.cluster.getFsmByPeer(peer3).getLogs().size());
-
- assertTrue(this.cliService.addPeer(this.groupId, this.conf, peer3).isOk());
- Thread.sleep(100);
- assertEquals(10, this.cluster.getFsmByPeer(peer3).getLogs().size());
- sendTestTaskAndWait(this.cluster.getLeader(), 0);
- Thread.sleep(100);
- assertEquals(6, this.cluster.getFsms().size());
- for (final MockStateMachine fsm : this.cluster.getFsms()) {
+ PeerId peer3 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 3);
+ assertTrue(cluster.start(peer3.getEndpoint()));
+ sendTestTaskAndWait(cluster.getLeader(), 0);
+ sleep(100);
+ assertEquals(0, cluster.getFsmByPeer(peer3).getLogs().size());
+
+ assertTrue(cliService.addPeer(groupId, conf, peer3).isOk());
+ sleep(100);
+ assertEquals(10, cluster.getFsmByPeer(peer3).getLogs().size());
+ sendTestTaskAndWait(cluster.getLeader(), 0);
+ sleep(100);
+ assertEquals(6, cluster.getFsms().size());
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(20, fsm.getLogs().size());
- }
//remove peer3
- assertTrue(this.cliService.removePeer(this.groupId, this.conf, peer3).isOk());
- Thread.sleep(200);
- sendTestTaskAndWait(this.cluster.getLeader(), 0);
- Thread.sleep(1000);
- assertEquals(6, this.cluster.getFsms().size());
- for (final MockStateMachine fsm : this.cluster.getFsms()) {
- if (fsm.getAddress().equals(peer3.getEndpoint())) {
+ assertTrue(cliService.removePeer(groupId, conf, peer3).isOk());
+ sleep(200);
+ sendTestTaskAndWait(cluster.getLeader(), 0);
+ sleep(1000);
+ assertEquals(6, cluster.getFsms().size());
+ for (MockStateMachine fsm : cluster.getFsms()) {
+ if (fsm.getAddress().equals(peer3.getEndpoint()))
assertEquals(20, fsm.getLogs().size());
- }
- else {
+ else
assertEquals(30, fsm.getLogs().size());
- }
}
}
@Test
public void testChangePeers() throws Exception {
- final List<PeerId> newPeers = TestUtils.generatePeers(10);
- newPeers.removeAll(this.conf.getPeerSet());
- for (final PeerId peer : newPeers) {
- assertTrue(this.cluster.start(peer.getEndpoint()));
- }
- this.cluster.waitLeader();
- final Node oldLeaderNode = this.cluster.getLeader();
+ List<PeerId> newPeers = TestUtils.generatePeers(10);
+ newPeers.removeAll(conf.getPeerSet());
+ for (PeerId peer : newPeers)
+ assertTrue(cluster.start(peer.getEndpoint()));
+ cluster.waitLeader();
+ Node oldLeaderNode = cluster.getLeader();
assertNotNull(oldLeaderNode);
- final PeerId oldLeader = oldLeaderNode.getNodeId().getPeerId();
+ PeerId oldLeader = oldLeaderNode.getNodeId().getPeerId();
assertNotNull(oldLeader);
- assertTrue(this.cliService.changePeers(this.groupId, this.conf, new Configuration(newPeers)).isOk());
- this.cluster.waitLeader();
- final PeerId newLeader = this.cluster.getLeader().getNodeId().getPeerId();
+ assertTrue(cliService.changePeers(groupId, conf, new Configuration(newPeers)).isOk());
+ cluster.waitLeader();
+ PeerId newLeader = cluster.getLeader().getNodeId().getPeerId();
assertNotEquals(oldLeader, newLeader);
assertTrue(newPeers.contains(newLeader));
}
@Test
public void testSnapshot() throws Exception {
- sendTestTaskAndWait(this.cluster.getLeader(), 0);
- assertEquals(5, this.cluster.getFsms().size());
- for (final MockStateMachine fsm : this.cluster.getFsms()) {
+ sendTestTaskAndWait(cluster.getLeader(), 0);
+ assertEquals(5, cluster.getFsms().size());
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(0, fsm.getSaveSnapshotTimes());
- }
- for (final PeerId peer : this.conf) {
- assertTrue(this.cliService.snapshot(this.groupId, peer).isOk());
- }
+ for (PeerId peer : conf)
+ assertTrue(cliService.snapshot(groupId, peer).isOk());
- for (final PeerId peer : this.conf.getLearners()) {
- assertTrue(this.cliService.snapshot(this.groupId, peer).isOk());
- }
+ for (PeerId peer : conf.getLearners())
+ assertTrue(cliService.snapshot(groupId, peer).isOk());
- for (final MockStateMachine fsm : this.cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(1, fsm.getSaveSnapshotTimes());
- }
}
@Test
public void testGetPeers() throws Exception {
- PeerId leader = this.cluster.getLeader().getNodeId().getPeerId();
+ PeerId leader = cluster.getLeader().getNodeId().getPeerId();
assertNotNull(leader);
- assertArrayEquals(this.conf.getPeerSet().toArray(),
- new HashSet<>(this.cliService.getPeers(this.groupId, this.conf)).toArray());
+ assertArrayEquals(conf.getPeerSet().toArray(),
+ new HashSet<>(cliService.getPeers(groupId, conf)).toArray());
// stop one peer
- final List<PeerId> peers = this.conf.getPeers();
- this.cluster.stop(peers.get(0).getEndpoint());
+ List<PeerId> peers = conf.getPeers();
+ cluster.stop(peers.get(0).getEndpoint());
- this.cluster.waitLeader();
+ cluster.waitLeader();
- leader = this.cluster.getLeader().getNodeId().getPeerId();
+ leader = cluster.getLeader().getNodeId().getPeerId();
assertNotNull(leader);
- assertArrayEquals(this.conf.getPeerSet().toArray(),
- new HashSet<>(this.cliService.getPeers(this.groupId, this.conf)).toArray());
+ assertArrayEquals(conf.getPeerSet().toArray(),
+ new HashSet<>(cliService.getPeers(groupId, conf)).toArray());
- this.cluster.stopAll();
+ cluster.stopAll();
try {
- this.cliService.getPeers(this.groupId, this.conf);
+ cliService.getPeers(groupId, conf);
fail();
}
- catch (final IllegalStateException e) {
- assertTrue(e.getMessage(), e.getMessage().startsWith("Fail to get leader of group " + this.groupId));
+ catch (IllegalStateException e) {
+ assertTrue(e.getMessage().startsWith("Fail to get leader of group " + groupId), e.getMessage());
}
}
@Test
public void testGetAlivePeers() throws Exception {
- PeerId leader = this.cluster.getLeader().getNodeId().getPeerId();
+ PeerId leader = cluster.getLeader().getNodeId().getPeerId();
assertNotNull(leader);
- assertArrayEquals(this.conf.getPeerSet().toArray(),
- new HashSet<>(this.cliService.getAlivePeers(this.groupId, this.conf)).toArray());
+ assertArrayEquals(conf.getPeerSet().toArray(),
+ new HashSet<>(cliService.getAlivePeers(groupId, conf)).toArray());
// stop one peer
- final List<PeerId> peers = this.conf.getPeers();
- this.cluster.stop(peers.get(0).getEndpoint());
+ List<PeerId> peers = conf.getPeers();
+ cluster.stop(peers.get(0).getEndpoint());
peers.remove(0);
- this.cluster.waitLeader();
+ cluster.waitLeader();
- Thread.sleep(1000);
+ sleep(1000);
- leader = this.cluster.getLeader().getNodeId().getPeerId();
+ leader = cluster.getLeader().getNodeId().getPeerId();
assertNotNull(leader);
assertArrayEquals(new HashSet<>(peers).toArray(),
- new HashSet<>(this.cliService.getAlivePeers(this.groupId, this.conf)).toArray());
+ new HashSet<>(cliService.getAlivePeers(groupId, conf)).toArray());
- this.cluster.stopAll();
+ cluster.stopAll();
try {
- this.cliService.getAlivePeers(this.groupId, this.conf);
+ cliService.getAlivePeers(groupId, conf);
fail();
}
- catch (final IllegalStateException e) {
- assertTrue(e.getMessage(), e.getMessage().startsWith("Fail to get leader of group " + this.groupId));
+ catch (IllegalStateException e) {
+ assertTrue(e.getMessage().startsWith("Fail to get leader of group " + groupId), e.getMessage());
}
}
@Test
public void testRebalance() {
- final Set<String> groupIds = new TreeSet<>();
+ Set<String> groupIds = new TreeSet<>();
groupIds.add("group_1");
groupIds.add("group_2");
groupIds.add("group_3");
@@ -403,23 +381,22 @@ public class ITCliServiceTest {
groupIds.add("group_6");
groupIds.add("group_7");
groupIds.add("group_8");
- final Configuration conf = new Configuration();
+ Configuration conf = new Configuration();
conf.addPeer(new PeerId("host_1", 8080));
conf.addPeer(new PeerId("host_2", 8080));
conf.addPeer(new PeerId("host_3", 8080));
- final Map<String, PeerId> rebalancedLeaderIds = new HashMap<>();
+ Map<String, PeerId> rebalancedLeaderIds = new HashMap<>();
- final CliService cliService = new MockCliService(rebalancedLeaderIds, new PeerId("host_1", 8080));
+ CliService cliService = new MockCliService(rebalancedLeaderIds, new PeerId("host_1", 8080));
assertTrue(cliService.rebalance(groupIds, conf, rebalancedLeaderIds).isOk());
assertEquals(groupIds.size(), rebalancedLeaderIds.size());
- final Map<PeerId, Integer> ret = new HashMap<>();
- for (Map.Entry<String, PeerId> entry : rebalancedLeaderIds.entrySet()) {
+ Map<PeerId, Integer> ret = new HashMap<>();
+ for (Map.Entry<String, PeerId> entry : rebalancedLeaderIds.entrySet())
ret.compute(entry.getValue(), (ignored, num) -> num == null ? 1 : num + 1);
- }
- final int expectedAvgLeaderNum = (int) Math.ceil((double) groupIds.size() / conf.size());
+ int expectedAvgLeaderNum = (int) Math.ceil((double) groupIds.size() / conf.size());
for (Map.Entry<PeerId, Integer> entry : ret.entrySet()) {
System.out.println(entry);
assertTrue(entry.getValue() <= expectedAvgLeaderNum);
@@ -428,26 +405,26 @@ public class ITCliServiceTest {
@Test
public void testRebalanceOnLeaderFail() {
- final Set<String> groupIds = new TreeSet<>();
+ Set<String> groupIds = new TreeSet<>();
groupIds.add("group_1");
groupIds.add("group_2");
groupIds.add("group_3");
groupIds.add("group_4");
- final Configuration conf = new Configuration();
+ Configuration conf = new Configuration();
conf.addPeer(new PeerId("host_1", 8080));
conf.addPeer(new PeerId("host_2", 8080));
conf.addPeer(new PeerId("host_3", 8080));
- final Map<String, PeerId> rebalancedLeaderIds = new HashMap<>();
+ Map<String, PeerId> rebalancedLeaderIds = new HashMap<>();
- final CliService cliService = new MockLeaderFailCliService();
+ CliService cliService = new MockLeaderFailCliService();
assertEquals("Fail to get leader", cliService.rebalance(groupIds, conf, rebalancedLeaderIds).getErrorMsg());
}
@Test
public void testRelalanceOnTransferLeaderFail() {
- final Set<String> groupIds = new TreeSet<>();
+ Set<String> groupIds = new TreeSet<>();
groupIds.add("group_1");
groupIds.add("group_2");
groupIds.add("group_3");
@@ -455,24 +432,23 @@ public class ITCliServiceTest {
groupIds.add("group_5");
groupIds.add("group_6");
groupIds.add("group_7");
- final Configuration conf = new Configuration();
+ Configuration conf = new Configuration();
conf.addPeer(new PeerId("host_1", 8080));
conf.addPeer(new PeerId("host_2", 8080));
conf.addPeer(new PeerId("host_3", 8080));
- final Map<String, PeerId> rebalancedLeaderIds = new HashMap<>();
+ Map<String, PeerId> rebalancedLeaderIds = new HashMap<>();
- final CliService cliService = new MockTransferLeaderFailCliService(rebalancedLeaderIds,
+ CliService cliService = new MockTransferLeaderFailCliService(rebalancedLeaderIds,
new PeerId("host_1", 8080));
assertEquals("Fail to transfer leader",
cliService.rebalance(groupIds, conf, rebalancedLeaderIds).getErrorMsg());
assertTrue(groupIds.size() >= rebalancedLeaderIds.size());
- final Map<PeerId, Integer> ret = new HashMap<>();
- for (Map.Entry<String, PeerId> entry : rebalancedLeaderIds.entrySet()) {
+ Map<PeerId, Integer> ret = new HashMap<>();
+ for (Map.Entry<String, PeerId> entry : rebalancedLeaderIds.entrySet())
ret.compute(entry.getValue(), (ignored, num) -> num == null ? 1 : num + 1);
- }
for (Map.Entry<PeerId, Integer> entry : ret.entrySet()) {
System.out.println(entry);
assertEquals(new PeerId("host_1", 8080), entry.getKey());
@@ -483,30 +459,31 @@ public class ITCliServiceTest {
private final Map<String, PeerId> rebalancedLeaderIds;
private final PeerId initialLeaderId;
- MockCliService(final Map<String, PeerId> rebalancedLeaderIds, final PeerId initialLeaderId) {
+ MockCliService(Map<String, PeerId> rebalancedLeaderIds, PeerId initialLeaderId) {
this.rebalancedLeaderIds = rebalancedLeaderIds;
this.initialLeaderId = initialLeaderId;
}
+ /** {@inheritDoc} */
@Override
- public Status getLeader(final String groupId, final Configuration conf, final PeerId leaderId) {
- final PeerId ret = this.rebalancedLeaderIds.get(groupId);
- if (ret != null) {
+ public Status getLeader(String groupId, Configuration conf, PeerId leaderId) {
+ PeerId ret = rebalancedLeaderIds.get(groupId);
+ if (ret != null)
leaderId.parse(ret.toString());
- }
- else {
- leaderId.parse(this.initialLeaderId.toString());
- }
+ else
+ leaderId.parse(initialLeaderId.toString());
return Status.OK();
}
+ /** {@inheritDoc} */
@Override
- public List<PeerId> getAlivePeers(final String groupId, final Configuration conf) {
+ public List<PeerId> getAlivePeers(String groupId, Configuration conf) {
return conf.getPeers();
}
+ /** {@inheritDoc} */
@Override
- public Status transferLeader(final String groupId, final Configuration conf, final PeerId peer) {
+ public Status transferLeader(String groupId, Configuration conf, PeerId peer) {
return Status.OK();
}
}
@@ -516,19 +493,21 @@ public class ITCliServiceTest {
super(null, null);
}
+ /** {@inheritDoc} */
@Override
- public Status getLeader(final String groupId, final Configuration conf, final PeerId leaderId) {
+ public Status getLeader(String groupId, Configuration conf, PeerId leaderId) {
return new Status(-1, "Fail to get leader");
}
}
static class MockTransferLeaderFailCliService extends MockCliService {
- MockTransferLeaderFailCliService(final Map<String, PeerId> rebalancedLeaderIds, final PeerId initialLeaderId) {
+ MockTransferLeaderFailCliService(Map<String, PeerId> rebalancedLeaderIds, PeerId initialLeaderId) {
super(rebalancedLeaderIds, initialLeaderId);
}
+ /** {@inheritDoc} */
@Override
- public Status transferLeader(final String groupId, final Configuration conf, final PeerId peer) {
+ public Status transferLeader(String groupId, Configuration conf, PeerId peer) {
return new Status(-1, "Fail to transfer leader");
}
}
@@ -538,7 +517,7 @@ public class ITCliServiceTest {
* @param timeout The timeout.
* @return {@code True} if condition has happened within the timeout.
*/
- @SuppressWarnings("BusyWait") protected boolean waitForCondition(BooleanSupplier cond, long timeout) {
+ @SuppressWarnings("BusyWait") protected static boolean waitForCondition(BooleanSupplier cond, long timeout) {
long stop = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < stop) {
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
index d168942..80c54e6 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.ignite.raft.jraft.core;
-import com.codahale.metrics.ConsoleReporter;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -38,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
+import com.codahale.metrics.ConsoleReporter;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.raft.jraft.Iterator;
import org.apache.ignite.raft.jraft.JRaftUtils;
@@ -76,27 +76,26 @@ import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Bits;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.Utils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
/**
* Integration tests for raft cluster.
@@ -109,9 +108,6 @@ public class ITNodeTest {
private final AtomicInteger startedCounter = new AtomicInteger(0);
private final AtomicInteger stoppedCounter = new AtomicInteger(0);
- @Rule
- public TestName testName = new TestName();
-
private long testStartMs;
private static DumpThread dumpThread;
@@ -122,9 +118,10 @@ public class ITNodeTest {
private static long DUMP_TIMEOUT_MS = 5 * 60 * 1000;
private volatile boolean stopped = false;
+ /** {@inheritDoc} */
@SuppressWarnings("BusyWait") @Override
public void run() {
- while (!this.stopped) {
+ while (!stopped) {
try {
Thread.sleep(DUMP_TIMEOUT_MS);
LOG.info("Test hang too long, dump threads");
@@ -138,7 +135,7 @@ public class ITNodeTest {
}
}
- @BeforeClass
+ @BeforeAll
public static void setupNodeTest() {
dumpThread = new DumpThread();
dumpThread.setName("NodeTest-DumpThread");
@@ -146,49 +143,49 @@ public class ITNodeTest {
dumpThread.start();
}
- @AfterClass
+ @AfterAll
public static void tearNodeTest() throws Exception {
dumpThread.stopped = true;
dumpThread.interrupt();
dumpThread.join(100);
}
- @Before
- public void setup() throws Exception {
- LOG.info(">>>>>>>>>>>>>>> Start test method: " + this.testName.getMethodName());
- this.dataPath = TestUtils.mkTempDir();
+ @BeforeEach
+ public void setup(TestInfo testInfo) throws Exception {
+ LOG.info(">>>>>>>>>>>>>>> Start test method: " + testInfo.getDisplayName());
+ dataPath = TestUtils.mkTempDir();
- File dataFile = new File(this.dataPath);
+ File dataFile = new File(dataPath);
if (dataFile.exists())
assertTrue(Utils.delete(dataFile));
dataFile.mkdirs();
- this.testStartMs = Utils.monotonicMs();
+ testStartMs = Utils.monotonicMs();
dumpThread.interrupt(); // reset dump timeout
}
- @After
- public void teardown() throws Exception {
+ @AfterEach
+ public void teardown(TestInfo testInfo) throws Exception {
if (cluster != null)
cluster.stopAll();
- assertTrue(Utils.delete(new File(this.dataPath)));
- this.startedCounter.set(0);
- this.stoppedCounter.set(0);
- LOG.info(">>>>>>>>>>>>>>> End test method: " + this.testName.getMethodName() + ", cost:"
- + (Utils.monotonicMs() - this.testStartMs) + " ms.");
+ assertTrue(Utils.delete(new File(dataPath)));
+ startedCounter.set(0);
+ stoppedCounter.set(0);
+ LOG.info(">>>>>>>>>>>>>>> End test method: " + testInfo.getDisplayName() + ", cost:"
+ + (Utils.monotonicMs() - testStartMs) + " ms.");
}
@Test
- public void testInitShutdown() throws Exception {
- final Endpoint addr = new Endpoint(TestUtils.getMyIp(), TestUtils.INIT_PORT);
- final NodeOptions nodeOptions = createNodeOptions();
+ public void testInitShutdown() {
+ Endpoint addr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
+ NodeOptions nodeOptions = createNodeOptions();
nodeOptions.setFsm(new MockStateMachine(addr));
- nodeOptions.setLogUri(this.dataPath + File.separator + "log");
- nodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
- nodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
+ nodeOptions.setLogUri(dataPath + File.separator + "log");
+ nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
+ nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions);
@@ -199,36 +196,35 @@ public class ITNodeTest {
@Test
public void testNodeTaskOverload() throws Exception {
- final Endpoint addr = new Endpoint(TestUtils.getMyIp(), TestUtils.INIT_PORT);
- final PeerId peer = new PeerId(addr, 0);
+ Endpoint addr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
+ PeerId peer = new PeerId(addr, 0);
- final NodeOptions nodeOptions = createNodeOptions();
- final RaftOptions raftOptions = new RaftOptions();
+ NodeOptions nodeOptions = createNodeOptions();
+ RaftOptions raftOptions = new RaftOptions();
raftOptions.setDisruptorBufferSize(2);
nodeOptions.setRaftOptions(raftOptions);
- final MockStateMachine fsm = new MockStateMachine(addr);
+ MockStateMachine fsm = new MockStateMachine(addr);
nodeOptions.setFsm(fsm);
- nodeOptions.setLogUri(this.dataPath + File.separator + "log");
- nodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
- nodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
+ nodeOptions.setLogUri(dataPath + File.separator + "log");
+ nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
+ nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer)));
RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions);
- final Node node = service.start(true);
+ Node node = service.start(true);
assertEquals(1, node.listPeers().size());
assertTrue(node.listPeers().contains(peer));
- while (!node.isLeader()) {
+ while (!node.isLeader())
;
- }
- final List<Task> tasks = new ArrayList<>();
- final AtomicInteger c = new AtomicInteger(0);
+ List<Task> tasks = new ArrayList<>();
+ AtomicInteger c = new AtomicInteger(0);
for (int i = 0; i < 10; i++) {
- final ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
- final Task task = new Task(data, new JoinableClosure(status -> {
+ ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
+ Task task = new Task(data, new JoinableClosure(status -> {
System.out.println(status);
if (!status.isOk()) {
assertTrue(
@@ -252,20 +248,20 @@ public class ITNodeTest {
* Test rollback stateMachine with readIndex for issue 317: https://github.com/sofastack/sofa-jraft/issues/317
*/
@Test
- public void testRollbackStateMachineWithReadIndex_Issue317() throws Exception {
- final Endpoint addr = new Endpoint(TestUtils.getMyIp(), TestUtils.INIT_PORT);
- final PeerId peer = new PeerId(addr, 0);
-
- final NodeOptions nodeOptions = createNodeOptions();
- final CountDownLatch applyCompleteLatch = new CountDownLatch(1);
- final CountDownLatch applyLatch = new CountDownLatch(1);
- final CountDownLatch readIndexLatch = new CountDownLatch(1);
- final AtomicInteger currentValue = new AtomicInteger(-1);
- final String errorMsg = this.testName.getMethodName();
- final StateMachine fsm = new StateMachineAdapter() {
+ public void testRollbackStateMachineWithReadIndex_Issue317(TestInfo testInfo) throws Exception {
+ Endpoint addr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
+ PeerId peer = new PeerId(addr, 0);
+
+ NodeOptions nodeOptions = createNodeOptions();
+ CountDownLatch applyCompleteLatch = new CountDownLatch(1);
+ CountDownLatch applyLatch = new CountDownLatch(1);
+ CountDownLatch readIndexLatch = new CountDownLatch(1);
+ AtomicInteger currentValue = new AtomicInteger(-1);
+ String errorMsg = testInfo.getDisplayName();
+ StateMachine fsm = new StateMachineAdapter() {
@Override
- public void onApply(final Iterator iter) {
+ public void onApply(Iterator iter) {
// Notify that the #onApply is preparing to go.
readIndexLatch.countDown();
// Wait for submitting a read-index request
@@ -291,21 +287,20 @@ public class ITNodeTest {
}
};
nodeOptions.setFsm(fsm);
- nodeOptions.setLogUri(this.dataPath + File.separator + "log");
- nodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
- nodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
+ nodeOptions.setLogUri(dataPath + File.separator + "log");
+ nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
+ nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer)));
RaftGroupService service = createService("unittest", peer, nodeOptions);
- final Node node = service.start(true);
+ Node node = service.start(true);
assertEquals(1, node.listPeers().size());
assertTrue(node.listPeers().contains(peer));
- while (!node.isLeader()) {
+ while (!node.isLeader())
;
- }
int n = 5;
{
@@ -317,23 +312,23 @@ public class ITNodeTest {
}
}
- final AtomicInteger readIndexSuccesses = new AtomicInteger(0);
+ AtomicInteger readIndexSuccesses = new AtomicInteger(0);
{
// Submit a read-index, wait for #onApply
readIndexLatch.await();
- final CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch latch = new CountDownLatch(1);
node.readIndex(null, new ReadIndexClosure() {
@Override
- public void run(final Status status, final long index, final byte[] reqCtx) {
+ public void run(Status status, long index, byte[] reqCtx) {
try {
- if (status.isOk()) {
+ if (status.isOk())
readIndexSuccesses.incrementAndGet();
- }
else {
- assertTrue("Unexpected status: " + status,
+ assertTrue(
status.getErrorMsg().contains(errorMsg) || status.getRaftError() == RaftError.ETIMEDOUT
- || status.getErrorMsg().contains("Invalid state for readIndex: STATE_ERROR"));
+ || status.getErrorMsg().contains("Invalid state for readIndex: STATE_ERROR"),
+ "Unexpected status: " + status);
}
}
finally {
@@ -346,9 +341,8 @@ public class ITNodeTest {
applyLatch.countDown();
// The state machine is in error state, the node should step down.
- while (node.isLeader()) {
+ while (node.isLeader())
Thread.sleep(10);
- }
latch.await();
applyCompleteLatch.await();
}
@@ -361,15 +355,15 @@ public class ITNodeTest {
@Test
public void testSingleNode() throws Exception {
- final Endpoint addr = new Endpoint(TestUtils.getMyIp(), TestUtils.INIT_PORT);
- final PeerId peer = new PeerId(addr, 0);
+ Endpoint addr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
+ PeerId peer = new PeerId(addr, 0);
- final NodeOptions nodeOptions = createNodeOptions();
- final MockStateMachine fsm = new MockStateMachine(addr);
+ NodeOptions nodeOptions = createNodeOptions();
+ MockStateMachine fsm = new MockStateMachine(addr);
nodeOptions.setFsm(fsm);
- nodeOptions.setLogUri(this.dataPath + File.separator + "log");
- nodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
- nodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
+ nodeOptions.setLogUri(dataPath + File.separator + "log");
+ nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
+ nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer)));
RaftGroupService service = createService("unittest", peer, nodeOptions);
@@ -378,86 +372,84 @@ public class ITNodeTest {
assertEquals(1, node.listPeers().size());
assertTrue(node.listPeers().contains(peer));
- while (!node.isLeader()) {
+ while (!node.isLeader())
;
- }
sendTestTaskAndWait(node);
assertEquals(10, fsm.getLogs().size());
int i = 0;
- for (final ByteBuffer data : fsm.getLogs()) {
+ for (ByteBuffer data : fsm.getLogs())
assertEquals("hello" + i++, new String(data.array()));
- }
service.shutdown();
}
@Test
public void testNoLeader() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unittest", this.dataPath, peers);
+ cluster = new TestCluster("unittest", dataPath, peers);
assertTrue(cluster.start(peers.get(0).getEndpoint()));
- final List<Node> followers = cluster.getFollowers();
+ List<Node> followers = cluster.getFollowers();
assertEquals(1, followers.size());
- final Node follower = followers.get(0);
+ Node follower = followers.get(0);
sendTestTaskAndWait(follower, 0, RaftError.EPERM);
// adds a peer3
- final PeerId peer3 = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + 3);
+ PeerId peer3 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 3);
CountDownLatch latch = new CountDownLatch(1);
follower.addPeer(peer3, new ExpectClosure(RaftError.EPERM, latch));
waitLatch(latch);
// remove the peer0
- final PeerId peer0 = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT);
+ PeerId peer0 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
latch = new CountDownLatch(1);
follower.removePeer(peer0, new ExpectClosure(RaftError.EPERM, latch));
waitLatch(latch);
}
- private void sendTestTaskAndWait(final Node node) throws InterruptedException {
- this.sendTestTaskAndWait(node, 0, 10, RaftError.SUCCESS);
+ private void sendTestTaskAndWait(Node node) throws InterruptedException {
+ sendTestTaskAndWait(node, 0, 10, RaftError.SUCCESS);
}
- private void sendTestTaskAndWait(final Node node, int amount) throws InterruptedException {
- this.sendTestTaskAndWait(node, 0, amount, RaftError.SUCCESS);
+ private void sendTestTaskAndWait(Node node, int amount) throws InterruptedException {
+ sendTestTaskAndWait(node, 0, amount, RaftError.SUCCESS);
}
- private void sendTestTaskAndWait(final Node node, final RaftError err) throws InterruptedException {
- this.sendTestTaskAndWait(node, 0, 10, err);
+ private void sendTestTaskAndWait(Node node, RaftError err) throws InterruptedException {
+ sendTestTaskAndWait(node, 0, 10, err);
}
- private void sendTestTaskAndWait(final Node node, final int start, int amount,
- final RaftError err) throws InterruptedException {
- final CountDownLatch latch = new CountDownLatch(amount);
+ private void sendTestTaskAndWait(Node node, int start, int amount,
+ RaftError err) throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(amount);
for (int i = start; i < start + amount; i++) {
- final ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
- final Task task = new Task(data, new ExpectClosure(err, latch));
+ ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
+ Task task = new Task(data, new ExpectClosure(err, latch));
node.apply(task);
}
waitLatch(latch);
}
- private void sendTestTaskAndWait(final Node node, final int start,
- final RaftError err) throws InterruptedException {
+ private void sendTestTaskAndWait(Node node, int start,
+ RaftError err) throws InterruptedException {
sendTestTaskAndWait(node, start, 10, err);
}
@SuppressWarnings("SameParameterValue")
- private void sendTestTaskAndWait(final String prefix, final Node node, final int code) throws InterruptedException {
+ private void sendTestTaskAndWait(String prefix, Node node, int code) throws InterruptedException {
sendTestTaskAndWait(prefix, node, 10, code);
}
@SuppressWarnings("SameParameterValue")
- private void sendTestTaskAndWait(final String prefix, final Node node, int amount,
- final int code) throws InterruptedException {
- final CountDownLatch latch = new CountDownLatch(10);
+ private void sendTestTaskAndWait(String prefix, Node node, int amount,
+ int code) throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < amount; i++) {
- final ByteBuffer data = ByteBuffer.wrap((prefix + i).getBytes());
- final Task task = new Task(data, new ExpectClosure(code, null, latch));
+ ByteBuffer data = ByteBuffer.wrap((prefix + i).getBytes());
+ Task task = new Task(data, new ExpectClosure(code, null, latch));
node.apply(task);
}
waitLatch(latch);
@@ -465,18 +457,17 @@ public class ITNodeTest {
@Test
public void testTripleNodesWithReplicatorStateListener() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
//final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers);
- final UserReplicatorStateListener listener1 = new UserReplicatorStateListener();
- final UserReplicatorStateListener listener2 = new UserReplicatorStateListener();
+ UserReplicatorStateListener listener1 = new UserReplicatorStateListener();
+ UserReplicatorStateListener listener2 = new UserReplicatorStateListener();
- cluster = new TestCluster("unitest", this.dataPath, peers, new LinkedHashSet<>(), 300,
+ cluster = new TestCluster("unitest", dataPath, peers, new LinkedHashSet<>(), 300,
opts -> opts.setReplicationStateListeners(List.of(listener1, listener2)));
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
// elect leader
cluster.waitLeader();
@@ -484,14 +475,13 @@ public class ITNodeTest {
for (Node follower : cluster.getFollowers())
waitForCondition(() -> follower.getLeaderId() != null, 5_000);
- assertEquals(4, this.startedCounter.get());
+ assertEquals(4, startedCounter.get());
assertEquals(2, cluster.getLeader().getReplicatorStateListeners().size());
assertEquals(2, cluster.getFollowers().get(0).getReplicatorStateListeners().size());
assertEquals(2, cluster.getFollowers().get(1).getReplicatorStateListeners().size());
- for (Node node : cluster.getNodes()) {
+ for (Node node : cluster.getNodes())
node.removeReplicatorStateListener(listener1);
- }
assertEquals(1, cluster.getLeader().getReplicatorStateListeners().size());
assertEquals(1, cluster.getFollowers().get(0).getReplicatorStateListeners().size());
assertEquals(1, cluster.getFollowers().get(1).getReplicatorStateListeners().size());
@@ -499,36 +489,33 @@ public class ITNodeTest {
// TODO asch Broken then using volatile log. A follower with empty log can become a leader IGNITE-14832.
@Test
- @Ignore
+ @Disabled
public void testVoteTimedoutStepDown() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
// elect leader
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
// Stop all followers
List<Node> followers = cluster.getFollowers();
assertFalse(followers.isEmpty());
- for (Node node : followers) {
+ for (Node node : followers)
assertTrue(cluster.stop(node.getNodeId().getPeerId().getEndpoint()));
- }
// Wait leader to step down.
- while (leader.isLeader()) {
+ while (leader.isLeader())
Thread.sleep(10);
- }
// old leader try to elect self, it should fail.
((NodeImpl) leader).tryElectSelf();
@@ -537,29 +524,31 @@ public class ITNodeTest {
assertNull(cluster.getLeader());
// Start followers
- for (Node node : followers) {
+ for (Node node : followers)
assertTrue(cluster.start(node.getNodeId().getPeerId().getEndpoint()));
- }
cluster.ensureSame();
}
class UserReplicatorStateListener implements Replicator.ReplicatorStateListener {
+ /** {@inheritDoc} */
@Override
- public void onCreated(final PeerId peer) {
- int val = ITNodeTest.this.startedCounter.incrementAndGet();
+ public void onCreated(PeerId peer) {
+ int val = startedCounter.incrementAndGet();
LOG.info("Replicator has been created {} {}", peer, val);
}
+ /** {@inheritDoc} */
@Override
- public void onError(final PeerId peer, final Status status) {
+ public void onError(PeerId peer, Status status) {
LOG.info("Replicator has errors {} {}", peer, status);
}
+ /** {@inheritDoc} */
@Override
- public void onDestroyed(final PeerId peer) {
- int val = ITNodeTest.this.stoppedCounter.incrementAndGet();
+ public void onDestroyed(PeerId peer) {
+ int val = stoppedCounter.incrementAndGet();
LOG.info("Replicator has been destroyed {} {}", peer, val);
}
@@ -567,35 +556,33 @@ public class ITNodeTest {
@Test
public void testLeaderTransferWithReplicatorStateListener() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unitest", this.dataPath, peers, new LinkedHashSet<>(), 300,
+ cluster = new TestCluster("unitest", dataPath, peers, new LinkedHashSet<>(), 300,
opts -> opts.setReplicationStateListeners(List.of(new UserReplicatorStateListener())));
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
cluster.waitLeader();
Node leader = cluster.getLeader();
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
Thread.sleep(100);
- final List<Node> followers = cluster.getFollowers();
+ List<Node> followers = cluster.getFollowers();
- assertTrue(this.startedCounter.get() + "", waitForCondition(() -> this.startedCounter.get() == 2, 5_000));
+ assertTrue(waitForCondition(() -> startedCounter.get() == 2, 5_000), startedCounter.get() + "");
- final PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy();
+ PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy();
LOG.info("Transfer leadership from {} to {}", leader, targetPeer);
assertTrue(leader.transferLeadershipTo(targetPeer).isOk());
Thread.sleep(1000);
cluster.waitLeader();
- assertTrue(this.startedCounter.get() + "", waitForCondition(() -> this.startedCounter.get() == 4, 5_000));
+ assertTrue(waitForCondition(() -> startedCounter.get() == 4, 5_000), startedCounter.get() + "");
- for (Node node : cluster.getNodes()) {
+ for (Node node : cluster.getNodes())
node.clearReplicatorStateListeners();
- }
assertEquals(0, cluster.getLeader().getReplicatorStateListeners().size());
assertEquals(0, cluster.getFollowers().get(0).getReplicatorStateListeners().size());
assertEquals(0, cluster.getFollowers().get(1).getReplicatorStateListeners().size());
@@ -603,38 +590,37 @@ public class ITNodeTest {
@Test
public void testTripleNodes() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
// elect leader
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
{
- final ByteBuffer data = ByteBuffer.wrap("no closure".getBytes());
- final Task task = new Task(data, null);
+ ByteBuffer data = ByteBuffer.wrap("no closure".getBytes());
+ Task task = new Task(data, null);
leader.apply(task);
}
{
// task with TaskClosure
- final ByteBuffer data = ByteBuffer.wrap("task closure".getBytes());
- final Vector<String> cbs = new Vector<>();
- final CountDownLatch latch = new CountDownLatch(1);
- final Task task = new Task(data, new TaskClosure() {
+ ByteBuffer data = ByteBuffer.wrap("task closure".getBytes());
+ Vector<String> cbs = new Vector<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ Task task = new Task(data, new TaskClosure() {
@Override
- public void run(final Status status) {
+ public void run(Status status) {
cbs.add("apply");
latch.countDown();
}
@@ -658,23 +644,23 @@ public class ITNodeTest {
@Test
public void testSingleNodeWithLearner() throws Exception {
- final Endpoint addr = new Endpoint(TestUtils.getMyIp(), TestUtils.INIT_PORT);
- final PeerId peer = new PeerId(addr, 0);
+ Endpoint addr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
+ PeerId peer = new PeerId(addr, 0);
- final Endpoint learnerAddr = new Endpoint(TestUtils.getMyIp(), TestUtils.INIT_PORT + 1);
- final PeerId learnerPeer = new PeerId(learnerAddr, 0);
+ Endpoint learnerAddr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 1);
+ PeerId learnerPeer = new PeerId(learnerAddr, 0);
final int cnt = 10;
MockStateMachine learnerFsm;
RaftGroupService learnerServer;
{
// Start learner
- final NodeOptions nodeOptions = createNodeOptions();
+ NodeOptions nodeOptions = createNodeOptions();
learnerFsm = new MockStateMachine(learnerAddr);
nodeOptions.setFsm(learnerFsm);
- nodeOptions.setLogUri(this.dataPath + File.separator + "log1");
- nodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta1");
- nodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot1");
+ nodeOptions.setLogUri(dataPath + File.separator + "log1");
+ nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta1");
+ nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot1");
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer), Collections
.singletonList(learnerPeer)));
@@ -684,12 +670,12 @@ public class ITNodeTest {
{
// Start leader
- final NodeOptions nodeOptions = createNodeOptions();
- final MockStateMachine fsm = new MockStateMachine(addr);
+ NodeOptions nodeOptions = createNodeOptions();
+ MockStateMachine fsm = new MockStateMachine(addr);
nodeOptions.setFsm(fsm);
- nodeOptions.setLogUri(this.dataPath + File.separator + "log");
- nodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
- nodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
+ nodeOptions.setLogUri(dataPath + File.separator + "log");
+ nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
+ nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer), Collections
.singletonList(learnerPeer)));
@@ -703,9 +689,8 @@ public class ITNodeTest {
sendTestTaskAndWait(node, cnt);
assertEquals(cnt, fsm.getLogs().size());
int i = 0;
- for (final ByteBuffer data : fsm.getLogs()) {
+ for (ByteBuffer data : fsm.getLogs())
assertEquals("hello" + i++, new String(data.array()));
- }
Thread.sleep(1000); //wait for entries to be replicated to learner.
server.shutdown();
}
@@ -713,31 +698,27 @@ public class ITNodeTest {
// assert learner fsm
assertEquals(cnt, learnerFsm.getLogs().size());
int i = 0;
- for (final ByteBuffer data : learnerFsm.getLogs()) {
+ for (ByteBuffer data : learnerFsm.getLogs())
assertEquals("hello" + i++, new String(data.array()));
- }
learnerServer.shutdown();
}
}
@Test
public void testResetLearners() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- final LinkedHashSet<PeerId> learners = new LinkedHashSet<>();
+ LinkedHashSet<PeerId> learners = new LinkedHashSet<>();
- for (int i = 0; i < 3; i++) {
- learners.add(new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + 3 + i));
- }
+ for (int i = 0; i < 3; i++)
+ learners.add(new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 3 + i));
- cluster = new TestCluster("unittest", this.dataPath, peers, learners, 300);
+ cluster = new TestCluster("unittest", dataPath, peers, learners, 300);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
- for (final PeerId peer : learners) {
+ for (PeerId peer : learners)
assertTrue(cluster.startLearner(peer));
- }
// elect leader
cluster.waitLeader();
@@ -747,7 +728,7 @@ public class ITNodeTest {
waitForCondition(() -> leader.listAlivePeers().size() == 3, 5_000);
waitForCondition(() -> leader.listAliveLearners().size() == 3, 5_000);
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
Thread.sleep(500);
List<MockStateMachine> fsms = cluster.getFsms();
assertEquals(6, fsms.size());
@@ -755,7 +736,7 @@ public class ITNodeTest {
{
// Reset learners to 2 nodes
- PeerId learnerPeer = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + 3);
+ PeerId learnerPeer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 3);
learners.remove(learnerPeer);
assertEquals(2, learners.size());
@@ -764,7 +745,7 @@ public class ITNodeTest {
assertTrue(done.await().isOk());
assertEquals(2, leader.listAliveLearners().size());
assertEquals(2, leader.listLearners().size());
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
Thread.sleep(500);
assertEquals(6, fsms.size());
@@ -777,12 +758,12 @@ public class ITNodeTest {
}
{
// remove another learner
- PeerId learnerPeer = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + 4);
+ PeerId learnerPeer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 4);
SynchronizedClosure done = new SynchronizedClosure();
leader.removeLearners(Arrays.asList(learnerPeer), done);
assertTrue(done.await().isOk());
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
Thread.sleep(500);
MockStateMachine fsm = fsms.remove(3); // get the removed learner's fsm
assertEquals(fsm.getAddress(), learnerPeer.getEndpoint());
@@ -798,24 +779,23 @@ public class ITNodeTest {
@Test
public void testTripleNodesWithStaticLearners() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unittest", this.dataPath, peers);
+ cluster = new TestCluster("unittest", dataPath, peers);
LinkedHashSet<PeerId> learners = new LinkedHashSet<>();
- PeerId learnerPeer = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + 3);
+ PeerId learnerPeer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 3);
learners.add(learnerPeer);
cluster.setLearners(learners);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
// elect leader
cluster.waitLeader();
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertEquals(3, leader.listPeers().size());
- assertEquals(leader.listLearners().size(), 1);
+ assertEquals(1, leader.listLearners().size());
assertTrue(leader.listLearners().contains(learnerPeer));
assertTrue(leader.listAliveLearners().isEmpty());
@@ -825,11 +805,11 @@ public class ITNodeTest {
Thread.sleep(1000);
assertEquals(3, leader.listPeers().size());
- assertEquals(leader.listLearners().size(), 1);
- assertEquals(leader.listAliveLearners().size(), 1);
+ assertEquals(1, leader.listLearners().size());
+ assertEquals(1, leader.listAliveLearners().size());
// apply tasks to leader
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
cluster.ensureSame();
assertEquals(4, cluster.getFsms().size());
@@ -837,18 +817,17 @@ public class ITNodeTest {
@Test
public void testTripleNodesWithLearners() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
// elect leader
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
assertTrue(leader.listLearners().isEmpty());
@@ -857,7 +836,7 @@ public class ITNodeTest {
{
// Adds a learner
SynchronizedClosure done = new SynchronizedClosure();
- PeerId learnerPeer = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + 3);
+ PeerId learnerPeer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 3);
// Start learner
assertTrue(cluster.startLearner(learnerPeer));
leader.addLearners(Arrays.asList(learnerPeer), done);
@@ -867,23 +846,23 @@ public class ITNodeTest {
}
// apply tasks to leader
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
{
- final ByteBuffer data = ByteBuffer.wrap("no closure".getBytes());
- final Task task = new Task(data, null);
+ ByteBuffer data = ByteBuffer.wrap("no closure".getBytes());
+ Task task = new Task(data, null);
leader.apply(task);
}
{
// task with TaskClosure
- final ByteBuffer data = ByteBuffer.wrap("task closure".getBytes());
- final Vector<String> cbs = new Vector<>();
- final CountDownLatch latch = new CountDownLatch(1);
- final Task task = new Task(data, new TaskClosure() {
+ ByteBuffer data = ByteBuffer.wrap("task closure".getBytes());
+ Vector<String> cbs = new Vector<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ Task task = new Task(data, new TaskClosure() {
@Override
- public void run(final Status status) {
+ public void run(Status status) {
cbs.add("apply");
latch.countDown();
}
@@ -909,7 +888,7 @@ public class ITNodeTest {
{
// Adds another learner
SynchronizedClosure done = new SynchronizedClosure();
- PeerId learnerPeer = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + 4);
+ PeerId learnerPeer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 4);
// Start learner
assertTrue(cluster.startLearner(learnerPeer));
leader.addLearners(Arrays.asList(learnerPeer), done);
@@ -919,11 +898,10 @@ public class ITNodeTest {
}
{
// stop two followers
- for (Node follower : cluster.getFollowers()) {
+ for (Node follower : cluster.getFollowers())
assertTrue(cluster.stop(follower.getNodeId().getPeerId().getEndpoint()));
- }
// send a new task
- final ByteBuffer data = ByteBuffer.wrap("task closure".getBytes());
+ ByteBuffer data = ByteBuffer.wrap("task closure".getBytes());
SynchronizedClosure done = new SynchronizedClosure();
leader.apply(new Task(data, done));
// should fail
@@ -943,18 +921,17 @@ public class ITNodeTest {
priorities.add(40);
priorities.add(40);
- final List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities);
+ List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), peer.getPriority()));
- }
// elect leader
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
assertEquals(100, leader.getNodeTargetPriority());
@@ -970,18 +947,17 @@ public class ITNodeTest {
priorities.add(40);
priorities.add(-1);
- final List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities);
+ List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), peer.getPriority()));
- }
// elect leader
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
assertEquals(2, cluster.getFollowers().size());
@@ -990,23 +966,22 @@ public class ITNodeTest {
@Test
public void testNodesWithSpecialPriorityElection() throws Exception {
- List<Integer> priorities = new ArrayList<Integer>();
+ List<Integer> priorities = new ArrayList<>();
priorities.add(0);
priorities.add(0);
priorities.add(-1);
- final List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities);
+ List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), peer.getPriority()));
- }
// elect leader
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
assertEquals(2, cluster.getFollowers().size());
@@ -1015,23 +990,22 @@ public class ITNodeTest {
@Test
public void testNodesWithZeroValPriorityElection() throws Exception {
- List<Integer> priorities = new ArrayList<Integer>();
+ List<Integer> priorities = new ArrayList<>();
priorities.add(50);
priorities.add(0);
priorities.add(0);
- final List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities);
+ List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), peer.getPriority()));
- }
// elect leader
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
assertEquals(2, cluster.getFollowers().size());
@@ -1046,36 +1020,33 @@ public class ITNodeTest {
priorities.add(0);
priorities.add(0);
- final List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities);
+ List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), peer.getPriority()));
- }
Thread.sleep(200);
- final List<Node> followers = cluster.getFollowers();
+ List<Node> followers = cluster.getFollowers();
assertEquals(3, followers.size());
- for (Node follower : followers) {
+ for (Node follower : followers)
assertEquals(0, follower.getNodeId().getPeerId().getPriority());
- }
}
@Test
public void testLeaderStopAndReElectWithPriority() throws Exception {
- final List<Integer> priorities = new ArrayList<>();
+ List<Integer> priorities = new ArrayList<>();
priorities.add(100);
priorities.add(60);
priorities.add(10);
- final List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities);
+ List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), peer.getPriority()));
- }
cluster.waitLeader();
Node leader = cluster.getLeader();
@@ -1116,17 +1087,16 @@ public class ITNodeTest {
@Test
public void testRemoveLeaderWithPriority() throws Exception {
- final List<Integer> priorities = new ArrayList<Integer>();
+ List<Integer> priorities = new ArrayList<>();
priorities.add(100);
priorities.add(60);
priorities.add(10);
- final List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities);
+ List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), peer.getPriority()));
- }
// elect leader
cluster.waitLeader();
@@ -1137,11 +1107,11 @@ public class ITNodeTest {
assertEquals(100, leader.getNodeTargetPriority());
assertEquals(100, leader.getNodeId().getPeerId().getPriority());
- final List<Node> followers = cluster.getFollowers();
+ List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
- final PeerId oldLeader = leader.getNodeId().getPeerId().copy();
- final Endpoint oldLeaderAddr = oldLeader.getEndpoint();
+ PeerId oldLeader = leader.getNodeId().getPeerId().copy();
+ Endpoint oldLeaderAddr = oldLeader.getEndpoint();
// remove old leader
LOG.info("Remove old leader {}", oldLeader);
@@ -1164,25 +1134,24 @@ public class ITNodeTest {
}
@Test
- @Ignore // TODO asch https://issues.apache.org/jira/browse/IGNITE-14833
+ @Disabled // TODO asch https://issues.apache.org/jira/browse/IGNITE-14833
public void testChecksum() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
// start with checksum validation
{
- final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers);
+ TestCluster cluster = new TestCluster("unittest", dataPath, peers);
try {
- final RaftOptions raftOptions = new RaftOptions();
+ RaftOptions raftOptions = new RaftOptions();
raftOptions.setEnableLogEntryChecksum(true);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), false, 300, true, null, raftOptions));
- }
cluster.waitLeader();
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
cluster.ensureSame();
}
finally {
@@ -1192,11 +1161,11 @@ public class ITNodeTest {
// restart with peer3 enable checksum validation
{
- final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers);
+ TestCluster cluster = new TestCluster("unittest", dataPath, peers);
try {
RaftOptions raftOptions = new RaftOptions();
raftOptions.setEnableLogEntryChecksum(false);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers) {
if (peer.equals(peers.get(2))) {
raftOptions = new RaftOptions();
raftOptions.setEnableLogEntryChecksum(true);
@@ -1205,10 +1174,10 @@ public class ITNodeTest {
}
cluster.waitLeader();
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
cluster.ensureSame();
}
finally {
@@ -1218,19 +1187,18 @@ public class ITNodeTest {
// restart with no checksum validation
{
- final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers);
+ TestCluster cluster = new TestCluster("unittest", dataPath, peers);
try {
- final RaftOptions raftOptions = new RaftOptions();
+ RaftOptions raftOptions = new RaftOptions();
raftOptions.setEnableLogEntryChecksum(false);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), false, 300, true, null, raftOptions));
- }
cluster.waitLeader();
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
cluster.ensureSame();
}
finally {
@@ -1240,19 +1208,18 @@ public class ITNodeTest {
// restart with all peers enable checksum validation
{
- final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers);
+ TestCluster cluster = new TestCluster("unittest", dataPath, peers);
try {
- final RaftOptions raftOptions = new RaftOptions();
+ RaftOptions raftOptions = new RaftOptions();
raftOptions.setEnableLogEntryChecksum(true);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), false, 300, true, null, raftOptions));
- }
cluster.waitLeader();
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
cluster.ensureSame();
}
finally {
@@ -1264,30 +1231,28 @@ public class ITNodeTest {
@Test
public void testReadIndex() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), false, 300, true));
- }
// elect leader
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
// first call will fail-fast when no connection
- if (!assertReadIndex(leader, 11)) {
+ if (!assertReadIndex(leader, 11))
assertTrue(assertReadIndex(leader, 11));
- }
// read from follower
- for (final Node follower : cluster.getFollowers()) {
+ for (Node follower : cluster.getFollowers()) {
assertNotNull(follower);
assertTrue(waitForCondition(() -> leader.getNodeId().getPeerId().equals(follower.getLeaderId()), 5_000));
@@ -1296,11 +1261,11 @@ public class ITNodeTest {
}
// read with null request context
- final CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch latch = new CountDownLatch(1);
leader.readIndex(null, new ReadIndexClosure() {
@Override
- public void run(final Status status, final long index, final byte[] reqCtx) {
+ public void run(Status status, long index, byte[] reqCtx) {
assertNull(reqCtx);
assertTrue(status.isOk());
latch.countDown();
@@ -1311,30 +1276,28 @@ public class ITNodeTest {
@Test // TODO asch do we need read index timeout ? https://issues.apache.org/jira/browse/IGNITE-14832
public void testReadIndexTimeout() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), false, 300, true));
- }
// elect leader
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
sendTestTaskAndWait(leader);
// first call will fail-fast when no connection
- if (!assertReadIndex(leader, 11)) {
+ if (!assertReadIndex(leader, 11))
assertTrue(assertReadIndex(leader, 11));
- }
// read from follower
- for (final Node follower : cluster.getFollowers()) {
+ for (Node follower : cluster.getFollowers()) {
assertNotNull(follower);
assertTrue(waitForCondition(() -> leader.getNodeId().getPeerId().equals(follower.getLeaderId()), 5_000));
@@ -1343,19 +1306,18 @@ public class ITNodeTest {
}
// read with null request context
- final CountDownLatch latch = new CountDownLatch(1);
- final long start = System.currentTimeMillis();
+ CountDownLatch latch = new CountDownLatch(1);
+ long start = System.currentTimeMillis();
leader.readIndex(null, new ReadIndexClosure() {
@Override
- public void run(final Status status, final long index, final byte[] reqCtx) {
+ public void run(Status status, long index, byte[] reqCtx) {
assertNull(reqCtx);
- if (status.isOk()) {
+ if (status.isOk())
System.err.println("Read-index so fast: " + (System.currentTimeMillis() - start) + "ms");
- }
else {
- assertEquals(status, new Status(RaftError.ETIMEDOUT, "read-index request timeout"));
- assertEquals(index, -1);
+ assertEquals(new Status(RaftError.ETIMEDOUT, "read-index request timeout"), status);
+ assertEquals(-1, index);
}
latch.countDown();
}
@@ -1365,27 +1327,26 @@ public class ITNodeTest {
@Test
public void testReadIndexFromLearner() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), false, 300, true));
- }
// elect leader
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
{
// Adds a learner
SynchronizedClosure done = new SynchronizedClosure();
- PeerId learnerPeer = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + 3);
+ PeerId learnerPeer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 3);
// Start learner
assertTrue(cluster.startLearner(learnerPeer));
leader.addLearners(Arrays.asList(learnerPeer), done);
@@ -1404,22 +1365,21 @@ public class ITNodeTest {
@Test
public void testReadIndexChaos() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), false, 300, true));
- }
// elect leader
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
- final CountDownLatch latch = new CountDownLatch(10);
+ CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread() {
@Override
@@ -1429,7 +1389,7 @@ public class ITNodeTest {
try {
sendTestTaskAndWait(leader);
}
- catch (final InterruptedException e) {
+ catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
readIndexRandom(cluster);
@@ -1440,16 +1400,16 @@ public class ITNodeTest {
}
}
- private void readIndexRandom(final TestCluster cluster) {
- final CountDownLatch readLatch = new CountDownLatch(1);
- final byte[] requestContext = TestUtils.getRandomBytes();
+ private void readIndexRandom(TestCluster cluster) {
+ CountDownLatch readLatch = new CountDownLatch(1);
+ byte[] requestContext = TestUtils.getRandomBytes();
cluster.getNodes().get(ThreadLocalRandom.current().nextInt(3))
.readIndex(requestContext, new ReadIndexClosure() {
@Override
- public void run(final Status status, final long index, final byte[] reqCtx) {
+ public void run(Status status, long index, byte[] reqCtx) {
if (status.isOk()) {
- assertTrue(status.toString(), status.isOk());
+ assertTrue(status.isOk(), status.toString());
assertTrue(index > 0);
assertArrayEquals(requestContext, reqCtx);
}
@@ -1459,7 +1419,7 @@ public class ITNodeTest {
try {
readLatch.await();
}
- catch (final InterruptedException e) {
+ catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@@ -1470,28 +1430,27 @@ public class ITNodeTest {
cluster.ensureSame();
- for (final MockStateMachine fsm : cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(10000, fsm.getLogs().size());
- }
}
@SuppressWarnings({"unused", "SameParameterValue"})
- private boolean assertReadIndex(final Node node, final int index) throws InterruptedException {
- final CountDownLatch latch = new CountDownLatch(1);
- final byte[] requestContext = TestUtils.getRandomBytes();
- final AtomicBoolean success = new AtomicBoolean(false);
+ private boolean assertReadIndex(Node node, int index) throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+ byte[] requestContext = TestUtils.getRandomBytes();
+ AtomicBoolean success = new AtomicBoolean(false);
node.readIndex(requestContext, new ReadIndexClosure() {
@Override
- public void run(final Status status, final long theIndex, final byte[] reqCtx) {
+ public void run(Status status, long theIndex, byte[] reqCtx) {
if (status.isOk()) {
assertEquals(index, theIndex);
assertArrayEquals(requestContext, reqCtx);
success.set(true);
}
else {
- assertTrue(status.getErrorMsg(), status.getErrorMsg().contains("RPC exception:Check connection["));
- assertTrue(status.getErrorMsg(), status.getErrorMsg().contains("] fail and try to create new one"));
+ assertTrue(status.getErrorMsg().contains("RPC exception:Check connection["), status.getErrorMsg());
+ assertTrue(status.getErrorMsg().contains("] fail and try to create new one"), status.getErrorMsg());
}
latch.countDown();
}
@@ -1502,33 +1461,32 @@ public class ITNodeTest {
@Test
public void testNodeMetrics() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), false, 300, true));
- }
// elect leader
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
{
- final ByteBuffer data = ByteBuffer.wrap("no closure".getBytes());
- final Task task = new Task(data, null);
+ ByteBuffer data = ByteBuffer.wrap("no closure".getBytes());
+ Task task = new Task(data, null);
leader.apply(task);
}
cluster.ensureSame();
- for (final Node node : cluster.getNodes()) {
+ for (Node node : cluster.getNodes()) {
System.out.println("-------------" + node.getNodeId() + "-------------");
- final ConsoleReporter reporter = ConsoleReporter.forRegistry(node.getNodeMetrics().getMetricRegistry())
+ ConsoleReporter reporter = ConsoleReporter.forRegistry(node.getNodeMetrics().getMetricRegistry())
.build();
reporter.report();
reporter.close();
@@ -1540,12 +1498,11 @@ public class ITNodeTest {
@Test
public void testLeaderFail() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
// elect leader
cluster.waitLeader();
@@ -1555,7 +1512,7 @@ public class ITNodeTest {
assertNotNull(leader);
LOG.info("Current leader is {}", leader.getLeaderId());
// apply tasks to leader
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
List<Node> followers = cluster.getFollowers();
@@ -1563,10 +1520,10 @@ public class ITNodeTest {
NodeImpl follower0 = (NodeImpl) follower;
DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService();
RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient();
- rpcClientEx.blockMessages(new BiPredicate<Object, String>() {
+ rpcClientEx.blockMessages(new BiPredicate<>() {
@Override public boolean test(Object msg, String nodeId) {
if (msg instanceof RpcRequests.RequestVoteRequest) {
- RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest) msg;
+ RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest)msg;
return !msg0.getPreVote();
}
@@ -1578,13 +1535,13 @@ public class ITNodeTest {
// stop leader
LOG.warn("Stop leader {}", leader.getNodeId().getPeerId());
- final PeerId oldLeader = leader.getNodeId().getPeerId();
+ PeerId oldLeader = leader.getNodeId().getPeerId();
assertTrue(cluster.stop(leader.getNodeId().getPeerId().getEndpoint()));
// apply something when follower
//final List<Node> followers = cluster.getFollowers();
assertFalse(followers.isEmpty());
- this.sendTestTaskAndWait("follower apply ", followers.get(0), -1);
+ sendTestTaskAndWait("follower apply ", followers.get(0), -1);
for (Node follower : followers) {
NodeImpl follower0 = (NodeImpl) follower;
@@ -1600,8 +1557,8 @@ public class ITNodeTest {
// apply tasks to new leader
CountDownLatch latch = new CountDownLatch(10);
for (int i = 10; i < 20; i++) {
- final ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
- final Task task = new Task(data, new ExpectClosure(latch));
+ ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
+ Task task = new Task(data, new ExpectClosure(latch));
leader.apply(task);
}
waitLatch(latch);
@@ -1612,8 +1569,8 @@ public class ITNodeTest {
// apply something
latch = new CountDownLatch(10);
for (int i = 20; i < 30; i++) {
- final ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
- final Task task = new Task(data, new ExpectClosure(latch));
+ ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
+ Task task = new Task(data, new ExpectClosure(latch));
leader.apply(task);
}
waitLatch(latch);
@@ -1627,31 +1584,30 @@ public class ITNodeTest {
assertTrue(cluster.start(oldLeader.getEndpoint()));
cluster.ensureSame();
- for (final MockStateMachine fsm : cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(30, fsm.getLogs().size());
- }
}
@Test
public void testJoinNodes() throws Exception {
- final PeerId peer0 = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT);
- final PeerId peer1 = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + 1);
- final PeerId peer2 = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + 2);
- final PeerId peer3 = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + 3);
+ PeerId peer0 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
+ PeerId peer1 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 1);
+ PeerId peer2 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 2);
+ PeerId peer3 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 3);
- final ArrayList<PeerId> peers = new ArrayList<>();
+ ArrayList<PeerId> peers = new ArrayList<>();
peers.add(peer0);
// start single cluster
- cluster = new TestCluster("unittest", this.dataPath, peers);
+ cluster = new TestCluster("unittest", dataPath, peers);
assertTrue(cluster.start(peer0.getEndpoint()));
cluster.waitLeader();
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
- Assert.assertEquals(leader.getNodeId().getPeerId(), peer0);
- this.sendTestTaskAndWait(leader);
+ assertEquals(leader.getNodeId().getPeerId(), peer0);
+ sendTestTaskAndWait(leader);
// start peer1
assertTrue(cluster.start(peer1.getEndpoint(), false, 300));
@@ -1664,9 +1620,8 @@ public class ITNodeTest {
cluster.ensureSame();
assertEquals(2, cluster.getFsms().size());
- for (final MockStateMachine fsm : cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(10, fsm.getLogs().size());
- }
// add peer2 but not start
peers.add(peer2);
@@ -1691,19 +1646,18 @@ public class ITNodeTest {
leader.addPeer(peer2, new ExpectClosure(latch));
fail();
}
- catch (final IllegalArgumentException e) {
+ catch (IllegalArgumentException e) {
assertEquals("Peer already exists in current configuration", e.getMessage());
}
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
assertEquals(2, cluster.getFollowers().size());
- for (final MockStateMachine fsm : cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(10, fsm.getLogs().size());
- }
}
- private void waitLatch(final CountDownLatch latch) throws InterruptedException {
+ private void waitLatch(CountDownLatch latch) throws InterruptedException {
assertTrue(latch.await(30, TimeUnit.SECONDS));
}
@@ -1711,27 +1665,26 @@ public class ITNodeTest {
public void testRemoveFollower() throws Exception {
List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
// elect leader
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
// apply tasks to leader
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
cluster.ensureSame();
List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
- final PeerId followerPeer = followers.get(0).getNodeId().getPeerId();
- final Endpoint followerAddr = followerPeer.getEndpoint();
+ PeerId followerPeer = followers.get(0).getNodeId().getPeerId();
+ Endpoint followerAddr = followerPeer.getEndpoint();
// stop and clean follower
LOG.info("Stop and clean follower {}", followerPeer);
@@ -1744,7 +1697,7 @@ public class ITNodeTest {
leader.removePeer(followerPeer, new ExpectClosure(latch));
waitLatch(latch);
- this.sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
+ sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
followers = cluster.getFollowers();
assertEquals(1, followers.size());
@@ -1764,19 +1717,17 @@ public class ITNodeTest {
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
- for (final MockStateMachine fsm : cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(20, fsm.getLogs().size());
- }
}
@Test
public void testRemoveLeader() throws Exception {
List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unittest", this.dataPath, peers);
- for (final PeerId peer : peers) {
+ cluster = new TestCluster("unittest", dataPath, peers);
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
// elect leader
cluster.waitLeader();
@@ -1785,15 +1736,15 @@ public class ITNodeTest {
Node leader = cluster.getLeader();
assertNotNull(leader);
// apply tasks to leader
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
cluster.ensureSame();
List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
- final PeerId oldLeader = leader.getNodeId().getPeerId().copy();
- final Endpoint oldLeaderAddr = oldLeader.getEndpoint();
+ PeerId oldLeader = leader.getNodeId().getPeerId().copy();
+ Endpoint oldLeaderAddr = oldLeader.getEndpoint();
// remove old leader
LOG.info("Remove old leader {}", oldLeader);
@@ -1808,7 +1759,7 @@ public class ITNodeTest {
LOG.info("New leader is {}", leader);
assertNotNull(leader);
// apply tasks to new leader
- this.sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
+ sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
// stop and clean old leader
LOG.info("Stop and clean old leader {}", oldLeader);
@@ -1829,36 +1780,34 @@ public class ITNodeTest {
assertEquals(2, followers.size());
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
- for (final MockStateMachine fsm : cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(20, fsm.getLogs().size());
- }
}
@Test
public void testPreVote() throws Exception {
List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unitest", this.dataPath, peers);
+ cluster = new TestCluster("unitest", dataPath, peers);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
cluster.waitLeader();
// get leader
Node leader = cluster.getLeader();
- final long savedTerm = ((NodeImpl) leader).getCurrentTerm();
+ long savedTerm = ((NodeImpl) leader).getCurrentTerm();
assertNotNull(leader);
// apply tasks to leader
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
cluster.ensureSame();
- final List<Node> followers = cluster.getFollowers();
+ List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
- final PeerId followerPeer = followers.get(0).getNodeId().getPeerId();
- final Endpoint followerAddr = followerPeer.getEndpoint();
+ PeerId followerPeer = followers.get(0).getNodeId().getPeerId();
+ Endpoint followerAddr = followerPeer.getEndpoint();
// remove follower
LOG.info("Remove follower {}", followerPeer);
@@ -1866,7 +1815,7 @@ public class ITNodeTest {
leader.removePeer(followerPeer, new ExpectClosure(latch));
waitLatch(latch);
- this.sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
+ sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
Thread.sleep(2000);
@@ -1885,14 +1834,14 @@ public class ITNodeTest {
@Test
public void testSetPeer1() throws Exception {
- cluster = new TestCluster("testSetPeer1", this.dataPath, new ArrayList<>());
+ cluster = new TestCluster("testSetPeer1", dataPath, new ArrayList<>());
- final PeerId bootPeer = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT);
+ PeerId bootPeer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
assertTrue(cluster.start(bootPeer.getEndpoint()));
- final List<Node> nodes = cluster.getFollowers();
+ List<Node> nodes = cluster.getFollowers();
assertEquals(1, nodes.size());
- final List<PeerId> peers = new ArrayList<>();
+ List<PeerId> peers = new ArrayList<>();
peers.add(bootPeer);
// reset peers from empty
assertTrue(nodes.get(0).resetPeers(new Configuration(peers)).isOk());
@@ -1902,39 +1851,38 @@ public class ITNodeTest {
@Test
public void testSetPeer2() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unitest", this.dataPath, peers);
+ cluster = new TestCluster("unitest", dataPath, peers);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
cluster.waitLeader();
// get leader
Node leader = cluster.getLeader();
assertNotNull(leader);
// apply tasks to leader
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
cluster.ensureSame();
- final List<Node> followers = cluster.getFollowers();
+ List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
- final PeerId followerPeer1 = followers.get(0).getNodeId().getPeerId();
- final Endpoint followerAddr1 = followerPeer1.getEndpoint();
- final PeerId followerPeer2 = followers.get(1).getNodeId().getPeerId();
- final Endpoint followerAddr2 = followerPeer2.getEndpoint();
+ PeerId followerPeer1 = followers.get(0).getNodeId().getPeerId();
+ Endpoint followerAddr1 = followerPeer1.getEndpoint();
+ PeerId followerPeer2 = followers.get(1).getNodeId().getPeerId();
+ Endpoint followerAddr2 = followerPeer2.getEndpoint();
LOG.info("Stop and clean follower {}", followerPeer1);
assertTrue(cluster.stop(followerAddr1));
cluster.clean(followerAddr1);
// apply tasks to leader again
- this.sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
+ sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
// set peer when no quorum die
- final Endpoint leaderAddr = leader.getLeaderId().getEndpoint().copy();
+ Endpoint leaderAddr = leader.getLeaderId().getEndpoint().copy();
LOG.info("Set peers to {}", leaderAddr);
LOG.info("Stop and clean follower {}", followerPeer2);
@@ -1957,7 +1905,7 @@ public class ITNodeTest {
cluster.waitLeader();
leader = cluster.getLeader();
assertNotNull(leader);
- Assert.assertEquals(leaderAddr, leader.getNodeId().getPeerId().getEndpoint());
+ assertEquals(leaderAddr, leader.getNodeId().getPeerId().getEndpoint());
LOG.info("start follower {}", followerAddr1);
assertTrue(cluster.start(followerAddr1, true, 300));
@@ -1982,9 +1930,8 @@ public class ITNodeTest {
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
- for (final MockStateMachine fsm : cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(20, fsm.getLogs().size());
- }
}
/**
@@ -1992,29 +1939,28 @@ public class ITNodeTest {
*/
@Test
public void testRestoreSnasphot() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unitest", this.dataPath, peers);
+ cluster = new TestCluster("unitest", dataPath, peers);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
LOG.info("Leader: " + leader);
assertNotNull(leader);
// apply tasks to leader
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
cluster.ensureSame();
triggerLeaderSnapshot(cluster, leader);
// stop leader
- final Endpoint leaderAddr = leader.getNodeId().getPeerId().getEndpoint().copy();
+ Endpoint leaderAddr = leader.getNodeId().getPeerId().getEndpoint().copy();
assertTrue(cluster.stop(leaderAddr));
// restart leader
@@ -2030,29 +1976,28 @@ public class ITNodeTest {
*/
@Test
public void testRestoreSnapshotWithDelta() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unitest", this.dataPath, peers);
+ cluster = new TestCluster("unitest", dataPath, peers);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
LOG.info("Leader: " + leader);
assertNotNull(leader);
// apply tasks to leader
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
cluster.ensureSame();
triggerLeaderSnapshot(cluster, leader);
// stop leader
- final Endpoint leaderAddr = leader.getNodeId().getPeerId().getEndpoint().copy();
+ Endpoint leaderAddr = leader.getNodeId().getPeerId().getEndpoint().copy();
assertTrue(cluster.stop(leaderAddr));
// restart leader
@@ -2072,18 +2017,17 @@ public class ITNodeTest {
assertEquals(1, fsm.getLoadSnapshotTimes());
}
- private void triggerLeaderSnapshot(final TestCluster cluster, final Node leader) throws InterruptedException {
- this.triggerLeaderSnapshot(cluster, leader, 1);
+ private void triggerLeaderSnapshot(TestCluster cluster, Node leader) throws InterruptedException {
+ triggerLeaderSnapshot(cluster, leader, 1);
}
- private void triggerLeaderSnapshot(final TestCluster cluster, final Node leader, final int times)
+ private void triggerLeaderSnapshot(TestCluster cluster, Node leader, int times)
throws InterruptedException {
// trigger leader snapshot
// first snapshot will be triggered randomly
int snapshotTimes = cluster.getLeaderFsm().getSaveSnapshotTimes();
- assertTrue("snapshotTimes=" + snapshotTimes + ", times=" + times, snapshotTimes == times - 1
- || snapshotTimes == times);
- final CountDownLatch latch = new CountDownLatch(1);
+ assertTrue(snapshotTimes == times - 1 || snapshotTimes == times, "snapshotTimes=" + snapshotTimes + ", times=" + times);
+ CountDownLatch latch = new CountDownLatch(1);
leader.snapshot(new ExpectClosure(latch));
waitLatch(latch);
assertEquals(snapshotTimes + 1, cluster.getLeaderFsm().getSaveSnapshotTimes());
@@ -2091,41 +2035,40 @@ public class ITNodeTest {
@Test
public void testInstallSnapshotWithThrottle() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unitest", this.dataPath, peers);
+ cluster = new TestCluster("unitest", dataPath, peers);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), false, 200, false, new ThroughputSnapshotThrottle(1024, 1)));
- }
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
// apply tasks to leader
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
cluster.ensureSame();
// stop follower1
- final List<Node> followers = cluster.getFollowers();
+ List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
- final Endpoint followerAddr = followers.get(0).getNodeId().getPeerId().getEndpoint();
+ Endpoint followerAddr = followers.get(0).getNodeId().getPeerId().getEndpoint();
assertTrue(cluster.stop(followerAddr));
cluster.waitLeader();
// apply something more
- this.sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
+ sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
Thread.sleep(1000);
// trigger leader snapshot
triggerLeaderSnapshot(cluster, leader);
// apply something more
- this.sendTestTaskAndWait(leader, 20, RaftError.SUCCESS);
+ sendTestTaskAndWait(leader, 20, RaftError.SUCCESS);
// trigger leader snapshot
triggerLeaderSnapshot(cluster, leader, 2);
@@ -2140,23 +2083,22 @@ public class ITNodeTest {
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
- for (final MockStateMachine fsm : cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(30, fsm.getLogs().size());
- }
}
@Test // TODO add test for timeout on snapshot install https://issues.apache.org/jira/browse/IGNITE-14832
public void testInstallLargeSnapshotWithThrottle() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(4);
- cluster = new TestCluster("unitest", this.dataPath, peers.subList(0, 3));
+ List<PeerId> peers = TestUtils.generatePeers(4);
+ cluster = new TestCluster("unitest", dataPath, peers.subList(0, 3));
for (int i = 0; i < peers.size() - 1; i++) {
- final PeerId peer = peers.get(i);
- final boolean started = cluster.start(peer.getEndpoint(), false, 200, false);
+ PeerId peer = peers.get(i);
+ boolean started = cluster.start(peer.getEndpoint(), false, 200, false);
assertTrue(started);
}
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
// apply tasks to leader
sendTestTaskAndWait(leader, 0, RaftError.SUCCESS);
@@ -2164,9 +2106,8 @@ public class ITNodeTest {
cluster.ensureSame();
// apply something more
- for (int i = 1; i < 100; i++) {
+ for (int i = 1; i < 100; i++)
sendTestTaskAndWait(leader, i * 10, RaftError.SUCCESS);
- }
Thread.sleep(1000);
@@ -2174,9 +2115,8 @@ public class ITNodeTest {
triggerLeaderSnapshot(cluster, leader);
// apply something more
- for (int i = 100; i < 200; i++) {
+ for (int i = 100; i < 200; i++)
sendTestTaskAndWait(leader, i * 10, RaftError.SUCCESS);
- }
// trigger leader snapshot
triggerLeaderSnapshot(cluster, leader, 2);
@@ -2184,14 +2124,14 @@ public class ITNodeTest {
Thread.sleep(1000);
// add follower
- final PeerId newPeer = peers.get(3);
- final SnapshotThrottle snapshotThrottle = new ThroughputSnapshotThrottle(128, 1);
- final boolean started = cluster.start(newPeer.getEndpoint(), false, 300, false, snapshotThrottle);
+ PeerId newPeer = peers.get(3);
+ SnapshotThrottle snapshotThrottle = new ThroughputSnapshotThrottle(128, 1);
+ boolean started = cluster.start(newPeer.getEndpoint(), false, 300, false, snapshotThrottle);
assertTrue(started);
- final CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch latch = new CountDownLatch(1);
leader.addPeer(newPeer, status -> {
- assertTrue(status.toString(), status.isOk());
+ assertTrue(status.isOk(), status.toString());
latch.countDown();
});
waitLatch(latch);
@@ -2199,23 +2139,22 @@ public class ITNodeTest {
cluster.ensureSame();
assertEquals(4, cluster.getFsms().size());
- for (final MockStateMachine fsm : cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(2000, fsm.getLogs().size());
- }
}
@Test
public void testInstallLargeSnapshot() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(4);
- cluster = new TestCluster("unitest", this.dataPath, peers.subList(0, 3));
+ List<PeerId> peers = TestUtils.generatePeers(4);
+ cluster = new TestCluster("unitest", dataPath, peers.subList(0, 3));
for (int i = 0; i < peers.size() - 1; i++) {
- final PeerId peer = peers.get(i);
- final boolean started = cluster.start(peer.getEndpoint(), false, 200, false);
+ PeerId peer = peers.get(i);
+ boolean started = cluster.start(peer.getEndpoint(), false, 200, false);
assertTrue(started);
}
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
// apply tasks to leader
sendTestTaskAndWait(leader, 0, RaftError.SUCCESS);
@@ -2223,9 +2162,8 @@ public class ITNodeTest {
cluster.ensureSame();
// apply something more
- for (int i = 1; i < 100; i++) {
+ for (int i = 1; i < 100; i++)
sendTestTaskAndWait(leader, i * 10, RaftError.SUCCESS);
- }
Thread.sleep(1000);
@@ -2233,9 +2171,8 @@ public class ITNodeTest {
triggerLeaderSnapshot(cluster, leader);
// apply something more
- for (int i = 100; i < 200; i++) {
+ for (int i = 100; i < 200; i++)
sendTestTaskAndWait(leader, i * 10, RaftError.SUCCESS);
- }
// trigger leader snapshot
triggerLeaderSnapshot(cluster, leader, 2);
@@ -2243,15 +2180,15 @@ public class ITNodeTest {
Thread.sleep(1000);
// add follower
- final PeerId newPeer = peers.get(3);
- final RaftOptions raftOptions = new RaftOptions();
+ PeerId newPeer = peers.get(3);
+ RaftOptions raftOptions = new RaftOptions();
raftOptions.setMaxByteCountPerRpc(128);
- final boolean started = cluster.start(newPeer.getEndpoint(), false, 300, false, null, raftOptions);
+ boolean started = cluster.start(newPeer.getEndpoint(), false, 300, false, null, raftOptions);
assertTrue(started);
- final CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch latch = new CountDownLatch(1);
leader.addPeer(newPeer, status -> {
- assertTrue(status.toString(), status.isOk());
+ assertTrue(status.isOk(), status.toString());
latch.countDown();
});
waitLatch(latch);
@@ -2259,36 +2196,34 @@ public class ITNodeTest {
cluster.ensureSame();
assertEquals(4, cluster.getFsms().size());
- for (final MockStateMachine fsm : cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(2000, fsm.getLogs().size());
- }
}
@Test
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-14853")
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-14853")
public void testInstallSnapshot() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unitest", this.dataPath, peers);
+ cluster = new TestCluster("unitest", dataPath, peers);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
cluster.waitLeader();
// get leader
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
// apply tasks to leader
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
cluster.ensureSame();
// stop follower1
- final List<Node> followers = cluster.getFollowers();
+ List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
- final Endpoint followerAddr = followers.get(0).getNodeId().getPeerId().getEndpoint();
+ Endpoint followerAddr = followers.get(0).getNodeId().getPeerId().getEndpoint();
assertTrue(cluster.stop(followerAddr));
// apply something more
@@ -2310,28 +2245,27 @@ public class ITNodeTest {
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
- for (final MockStateMachine fsm : cluster.getFsms()) {
- assertEquals(fsm.getAddress().toString(), 30, fsm.getLogs().size());
- }
+ for (MockStateMachine fsm : cluster.getFsms())
+ assertEquals(30, fsm.getLogs().size(), fsm.getAddress().toString());
}
@Test
public void testNoSnapshot() throws Exception {
- final Endpoint addr = new Endpoint(TestUtils.getMyIp(), TestUtils.INIT_PORT);
- final NodeOptions nodeOptions = createNodeOptions();
- final MockStateMachine fsm = new MockStateMachine(addr);
+ Endpoint addr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
+ NodeOptions nodeOptions = createNodeOptions();
+ MockStateMachine fsm = new MockStateMachine(addr);
nodeOptions.setFsm(fsm);
- nodeOptions.setLogUri(this.dataPath + File.separator + "log");
- nodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
+ nodeOptions.setLogUri(dataPath + File.separator + "log");
+ nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(new PeerId(addr, 0))));
RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions);
- final Node node = service.start();
+ Node node = service.start();
// wait node elect self as leader
Thread.sleep(2000);
- this.sendTestTaskAndWait(node);
+ sendTestTaskAndWait(node);
assertEquals(0, fsm.getSaveSnapshotTimes());
// do snapshot but returns error
@@ -2345,18 +2279,18 @@ public class ITNodeTest {
@Test
public void testAutoSnapshot() throws Exception {
- final Endpoint addr = new Endpoint(TestUtils.getMyIp(), TestUtils.INIT_PORT);
- final NodeOptions nodeOptions = createNodeOptions();
- final MockStateMachine fsm = new MockStateMachine(addr);
+ Endpoint addr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
+ NodeOptions nodeOptions = createNodeOptions();
+ MockStateMachine fsm = new MockStateMachine(addr);
nodeOptions.setFsm(fsm);
- nodeOptions.setLogUri(this.dataPath + File.separator + "log");
- nodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
- nodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
+ nodeOptions.setLogUri(dataPath + File.separator + "log");
+ nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
+ nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
nodeOptions.setSnapshotIntervalSecs(10);
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(new PeerId(addr, 0))));
RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions);
- final Node node = service.start();
+ Node node = service.start();
// wait node elect self as leader
Thread.sleep(2000);
@@ -2365,8 +2299,8 @@ public class ITNodeTest {
// wait for auto snapshot
Thread.sleep(10000);
// first snapshot will be triggered randomly
- final int times = fsm.getSaveSnapshotTimes();
- assertTrue("snapshotTimes=" + times, times >= 1);
+ int times = fsm.getSaveSnapshotTimes();
+ assertTrue(times >= 1, "snapshotTimes=" + times);
assertTrue(fsm.getSnapshotIndex() > 0);
service.shutdown();
@@ -2374,23 +2308,22 @@ public class ITNodeTest {
@Test
public void testLeaderShouldNotChange() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unitest", this.dataPath, peers);
+ cluster = new TestCluster("unitest", dataPath, peers);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
cluster.waitLeader();
// get leader
- final Node leader0 = cluster.getLeader();
+ Node leader0 = cluster.getLeader();
assertNotNull(leader0);
- final long savedTerm = ((NodeImpl) leader0).getCurrentTerm();
+ long savedTerm = ((NodeImpl) leader0).getCurrentTerm();
LOG.info("Current leader is {}, term is {}", leader0, savedTerm);
Thread.sleep(5000);
cluster.waitLeader();
- final Node leader1 = cluster.getLeader();
+ Node leader1 = cluster.getLeader();
assertNotNull(leader1);
LOG.info("Current leader is {}", leader1);
assertEquals(savedTerm, ((NodeImpl) leader1).getCurrentTerm());
@@ -2398,33 +2331,32 @@ public class ITNodeTest {
@Test
public void testRecoverFollower() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unitest", this.dataPath, peers);
+ cluster = new TestCluster("unitest", dataPath, peers);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
cluster.waitLeader();
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
- final List<Node> followers = cluster.getFollowers();
+ List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
// Ensure the quorum before stopping a follower, otherwise leader can step down.
assertTrue(waitForCondition(() -> followers.get(1).getLeaderId() != null, 5_000));
- final Endpoint followerAddr = followers.get(0).getNodeId().getPeerId().getEndpoint().copy();
+ Endpoint followerAddr = followers.get(0).getNodeId().getPeerId().getEndpoint().copy();
assertTrue(cluster.stop(followerAddr));
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
for (int i = 10; i < 30; i++) {
- final ByteBuffer data = ByteBuffer.wrap(("no cluster" + i).getBytes());
- final Task task = new Task(data, null);
+ ByteBuffer data = ByteBuffer.wrap(("no cluster" + i).getBytes());
+ Task task = new Task(data, null);
leader.apply(task);
}
// wait leader to compact logs
@@ -2433,71 +2365,68 @@ public class ITNodeTest {
assertTrue(cluster.start(followerAddr));
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
- for (final MockStateMachine fsm : cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(30, fsm.getLogs().size());
- }
}
@Test
public void testLeaderTransfer() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unitest", this.dataPath, peers, 300);
+ cluster = new TestCluster("unitest", dataPath, peers, 300);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
cluster.waitLeader();
Node leader = cluster.getLeader();
assertNotNull(leader);
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
Thread.sleep(100);
- final List<Node> followers = cluster.getFollowers();
+ List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
- final PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy();
+ PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy();
LOG.info("Transfer leadership from {} to {}", leader, targetPeer);
assertTrue(leader.transferLeadershipTo(targetPeer).isOk());
Thread.sleep(1000);
cluster.waitLeader();
leader = cluster.getLeader();
- Assert.assertEquals(leader.getNodeId().getPeerId(), targetPeer);
+ assertEquals(leader.getNodeId().getPeerId(), targetPeer);
}
@Test
public void testLeaderTransferBeforeLogIsCompleted() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unitest", this.dataPath, peers, 300);
+ cluster = new TestCluster("unitest", dataPath, peers, 300);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), false, 1));
- }
cluster.waitLeader();
Node leader = cluster.getLeader();
assertNotNull(leader);
- final List<Node> followers = cluster.getFollowers();
+ List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
// Ensure the quorum before stopping a follower, otherwise leader can step down.
assertTrue(waitForCondition(() -> followers.get(1).getLeaderId() != null, 5_000));
- final PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy();
+ PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy();
assertTrue(cluster.stop(targetPeer.getEndpoint()));
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
LOG.info("Transfer leadership from {} to {}", leader, targetPeer);
assertTrue(leader.transferLeadershipTo(targetPeer).isOk());
- final CountDownLatch latch = new CountDownLatch(1);
- final Task task = new Task(ByteBuffer.wrap("aaaaa".getBytes()), new ExpectClosure(RaftError.EBUSY, latch));
+ CountDownLatch latch = new CountDownLatch(1);
+ Task task = new Task(ByteBuffer.wrap("aaaaa".getBytes()), new ExpectClosure(RaftError.EBUSY, latch));
leader.apply(task);
waitLatch(latch);
@@ -2507,35 +2436,34 @@ public class ITNodeTest {
leader = cluster.getLeader();
- Assert.assertNotEquals(targetPeer, leader.getNodeId().getPeerId());
+ assertNotEquals(targetPeer, leader.getNodeId().getPeerId());
cluster.ensureSame();
}
@Test
public void testLeaderTransferResumeOnFailure() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unitest", this.dataPath, peers, 300);
+ cluster = new TestCluster("unitest", dataPath, peers, 300);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint(), false, 1));
- }
cluster.waitLeader();
Node leader = cluster.getLeader();
assertNotNull(leader);
- final List<Node> followers = cluster.getFollowers();
+ List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
- final PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy();
+ PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy();
assertTrue(cluster.stop(targetPeer.getEndpoint()));
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
assertTrue(leader.transferLeadershipTo(targetPeer).isOk());
- final Node savedLeader = leader;
+ Node savedLeader = leader;
//try to apply task when transferring leadership
CountDownLatch latch = new CountDownLatch(1);
Task task = new Task(ByteBuffer.wrap("aaaaa".getBytes()), new ExpectClosure(RaftError.EBUSY, latch));
@@ -2568,48 +2496,49 @@ public class ITNodeTest {
this(new Endpoint(Utils.IP_ANY, 0));
}
- MockFSM1(final Endpoint address) {
+ MockFSM1(Endpoint address) {
super(address);
}
+ /** {@inheritDoc} */
@Override
- public boolean onSnapshotLoad(final SnapshotReader reader) {
+ public boolean onSnapshotLoad(SnapshotReader reader) {
return false;
}
}
@Test
public void testShutdownAndJoinWorkAfterInitFails() throws Exception {
- final Endpoint addr = new Endpoint(TestUtils.getMyIp(), TestUtils.INIT_PORT);
+ Endpoint addr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
{
- final NodeOptions nodeOptions = createNodeOptions();
- final MockStateMachine fsm = new MockStateMachine(addr);
+ NodeOptions nodeOptions = createNodeOptions();
+ MockStateMachine fsm = new MockStateMachine(addr);
nodeOptions.setFsm(fsm);
- nodeOptions.setLogUri(this.dataPath + File.separator + "log");
- nodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
- nodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
+ nodeOptions.setLogUri(dataPath + File.separator + "log");
+ nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
+ nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
nodeOptions.setSnapshotIntervalSecs(10);
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(new PeerId(addr, 0))));
RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions);
- final Node node = service.start(true);
+ Node node = service.start(true);
Thread.sleep(1000);
- this.sendTestTaskAndWait(node);
+ sendTestTaskAndWait(node);
// save snapshot
- final CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch latch = new CountDownLatch(1);
node.snapshot(new ExpectClosure(latch));
waitLatch(latch);
service.shutdown();
}
{
- final NodeOptions nodeOptions = createNodeOptions();
- final MockStateMachine fsm = new MockFSM1(addr);
+ NodeOptions nodeOptions = createNodeOptions();
+ MockStateMachine fsm = new MockFSM1(addr);
nodeOptions.setFsm(fsm);
- nodeOptions.setLogUri(this.dataPath + File.separator + "log");
- nodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot");
- nodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta");
+ nodeOptions.setLogUri(dataPath + File.separator + "log");
+ nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
+ nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
nodeOptions.setSnapshotIntervalSecs(10);
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(new PeerId(addr, 0))));
@@ -2636,19 +2565,18 @@ public class ITNodeTest {
*/
@Test
public void testShuttingDownLeaderTriggerTimeoutNow() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unitest", this.dataPath, peers, 300);
+ cluster = new TestCluster("unitest", dataPath, peers, 300);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
cluster.waitLeader();
Node leader = cluster.getLeader();
assertNotNull(leader);
- final Node oldLeader = leader;
+ Node oldLeader = leader;
LOG.info("Shutdown leader {}", leader);
leader.shutdown();
@@ -2663,26 +2591,24 @@ public class ITNodeTest {
@Test
public void testRemovingLeaderTriggerTimeoutNow() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unitest", this.dataPath, peers, 300);
+ cluster = new TestCluster("unitest", dataPath, peers, 300);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
cluster.waitLeader();
// Ensure the quorum before removing a leader, otherwise removePeer can be rejected.
- for (Node follower : cluster.getFollowers()) {
+ for (Node follower : cluster.getFollowers())
assertTrue(waitForCondition(() -> follower.getLeaderId() != null, 5_000));
- }
Node leader = cluster.getLeader();
assertNotNull(leader);
- final Node oldLeader = leader;
+ Node oldLeader = leader;
- final CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch latch = new CountDownLatch(1);
oldLeader.removePeer(oldLeader.getNodeId().getPeerId(), new ExpectClosure(latch));
waitLatch(latch);
@@ -2694,29 +2620,28 @@ public class ITNodeTest {
@Test
public void testTransferShouldWorkAfterInstallSnapshot() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unitest", this.dataPath, peers, 1000);
+ cluster = new TestCluster("unitest", dataPath, peers, 1000);
- for (int i = 0; i < peers.size() - 1; i++) {
+ for (int i = 0; i < peers.size() - 1; i++)
assertTrue(cluster.start(peers.get(i).getEndpoint()));
- }
cluster.waitLeader();
Node leader = cluster.getLeader();
assertNotNull(leader);
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
- final List<Node> followers = cluster.getFollowers();
+ List<Node> followers = cluster.getFollowers();
assertEquals(1, followers.size());
- final PeerId follower = followers.get(0).getNodeId().getPeerId();
+ PeerId follower = followers.get(0).getNodeId().getPeerId();
assertTrue(leader.transferLeadershipTo(follower).isOk());
Thread.sleep(2000);
leader = cluster.getLeader();
- Assert.assertEquals(follower, leader.getNodeId().getPeerId());
+ assertEquals(follower, leader.getNodeId().getPeerId());
CountDownLatch latch = new CountDownLatch(1);
leader.snapshot(new ExpectClosure(latch));
@@ -2726,55 +2651,53 @@ public class ITNodeTest {
waitLatch(latch);
// start the last peer which should be recover with snapshot.
- final PeerId lastPeer = peers.get(2);
+ PeerId lastPeer = peers.get(2);
assertTrue(cluster.start(lastPeer.getEndpoint()));
Thread.sleep(5000);
assertTrue(leader.transferLeadershipTo(lastPeer).isOk());
Thread.sleep(2000);
leader = cluster.getLeader();
- Assert.assertEquals(lastPeer, leader.getNodeId().getPeerId());
+ assertEquals(lastPeer, leader.getNodeId().getPeerId());
assertEquals(3, cluster.getFsms().size());
- for (final MockStateMachine fsm : cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(10, fsm.getLogs().size());
- }
}
@Test
public void testAppendEntriesWhenFollowerIsInErrorState() throws Exception {
// start five nodes
- final List<PeerId> peers = TestUtils.generatePeers(5);
+ List<PeerId> peers = TestUtils.generatePeers(5);
- cluster = new TestCluster("unitest", this.dataPath, peers, 1000);
+ cluster = new TestCluster("unitest", dataPath, peers, 1000);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
cluster.waitLeader();
- final Node oldLeader = cluster.getLeader();
+ Node oldLeader = cluster.getLeader();
assertNotNull(oldLeader);
// apply something
- this.sendTestTaskAndWait(oldLeader);
+ sendTestTaskAndWait(oldLeader);
// set one follower into error state
- final List<Node> followers = cluster.getFollowers();
+ List<Node> followers = cluster.getFollowers();
assertEquals(4, followers.size());
- final Node errorNode = followers.get(0);
- final PeerId errorPeer = errorNode.getNodeId().getPeerId().copy();
- final Endpoint errorFollowerAddr = errorPeer.getEndpoint();
+ Node errorNode = followers.get(0);
+ PeerId errorPeer = errorNode.getNodeId().getPeerId().copy();
+ Endpoint errorFollowerAddr = errorPeer.getEndpoint();
LOG.info("Set follower {} into error state", errorNode);
((NodeImpl) errorNode).onError(new RaftException(EnumOutter.ErrorType.ERROR_TYPE_STATE_MACHINE, new Status(-1,
"Follower has something wrong.")));
// increase term by stopping leader and electing a new leader again
- final Endpoint oldLeaderAddr = oldLeader.getNodeId().getPeerId().getEndpoint().copy();
+ Endpoint oldLeaderAddr = oldLeader.getNodeId().getPeerId().getEndpoint().copy();
assertTrue(cluster.stop(oldLeaderAddr));
cluster.waitLeader();
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
LOG.info("Elect a new leader {}", leader);
// apply something again
- this.sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
+ sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
// stop error follower
Thread.sleep(20);
@@ -2787,61 +2710,59 @@ public class ITNodeTest {
assertTrue(cluster.start(oldLeaderAddr));
cluster.ensureSame();
assertEquals(5, cluster.getFsms().size());
- for (final MockStateMachine fsm : cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(20, fsm.getLogs().size());
- }
}
@Test
public void testFollowerStartStopFollowing() throws Exception {
// start five nodes
- final List<PeerId> peers = TestUtils.generatePeers(5);
+ List<PeerId> peers = TestUtils.generatePeers(5);
- cluster = new TestCluster("unitest", this.dataPath, peers, 1000);
+ cluster = new TestCluster("unitest", dataPath, peers, 1000);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
cluster.waitLeader();
- final Node firstLeader = cluster.getLeader();
+ Node firstLeader = cluster.getLeader();
assertNotNull(firstLeader);
// apply something
- this.sendTestTaskAndWait(firstLeader);
+ sendTestTaskAndWait(firstLeader);
// assert follow times
- final List<Node> firstFollowers = cluster.getFollowers();
+ List<Node> firstFollowers = cluster.getFollowers();
assertEquals(4, firstFollowers.size());
- for (final Node node : firstFollowers) {
+ for (Node node : firstFollowers) {
assertTrue(waitForCondition(() -> ((MockStateMachine) node.getOptions().getFsm()).getOnStartFollowingTimes() == 1, 5_000));
assertEquals(0, ((MockStateMachine) node.getOptions().getFsm()).getOnStopFollowingTimes());
}
// stop leader and elect new one
- final Endpoint fstLeaderAddr = firstLeader.getNodeId().getPeerId().getEndpoint();
+ Endpoint fstLeaderAddr = firstLeader.getNodeId().getPeerId().getEndpoint();
assertTrue(cluster.stop(fstLeaderAddr));
cluster.waitLeader();
- final Node secondLeader = cluster.getLeader();
+ Node secondLeader = cluster.getLeader();
assertNotNull(secondLeader);
- this.sendTestTaskAndWait(secondLeader, 10, RaftError.SUCCESS);
+ sendTestTaskAndWait(secondLeader, 10, RaftError.SUCCESS);
// ensure start/stop following times
- final List<Node> secondFollowers = cluster.getFollowers();
+ List<Node> secondFollowers = cluster.getFollowers();
assertEquals(3, secondFollowers.size());
- for (final Node node : secondFollowers) {
+ for (Node node : secondFollowers) {
assertEquals(2, ((MockStateMachine) node.getOptions().getFsm()).getOnStartFollowingTimes());
assertEquals(1, ((MockStateMachine) node.getOptions().getFsm()).getOnStopFollowingTimes());
}
// transfer leadership to a follower
- final PeerId targetPeer = secondFollowers.get(0).getNodeId().getPeerId().copy();
+ PeerId targetPeer = secondFollowers.get(0).getNodeId().getPeerId().copy();
assertTrue(secondLeader.transferLeadershipTo(targetPeer).isOk());
Thread.sleep(100);
cluster.waitLeader();
- final Node thirdLeader = cluster.getLeader();
- Assert.assertEquals(targetPeer, thirdLeader.getNodeId().getPeerId());
- this.sendTestTaskAndWait(thirdLeader, 20, RaftError.SUCCESS);
+ Node thirdLeader = cluster.getLeader();
+ assertEquals(targetPeer, thirdLeader.getNodeId().getPeerId());
+ sendTestTaskAndWait(thirdLeader, 20, RaftError.SUCCESS);
- final List<Node> thirdFollowers = cluster.getFollowers();
+ List<Node> thirdFollowers = cluster.getFollowers();
assertEquals(3, thirdFollowers.size());
for (int i = 0; i < 3; i++) {
if (thirdFollowers.get(i).getNodeId().getPeerId().equals(secondLeader.getNodeId().getPeerId())) {
@@ -2861,18 +2782,17 @@ public class ITNodeTest {
@Test
public void readCommittedUserLog() throws Exception {
// setup cluster
- final List<PeerId> peers = TestUtils.generatePeers(3);
+ List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unitest", this.dataPath, peers, 1000);
+ cluster = new TestCluster("unitest", dataPath, peers, 1000);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
cluster.waitLeader();
- final Node leader = cluster.getLeader();
+ Node leader = cluster.getLeader();
assertNotNull(leader);
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
// index == 1 is a CONFIGURATION log, so real_index will be 2 when returned.
UserLog userLog = leader.readCommittedUserLog(1);
@@ -2891,8 +2811,8 @@ public class ITNodeTest {
assertNull(leader.readCommittedUserLog(15));
fail();
}
- catch (final LogIndexOutOfBoundsException e) {
- assertEquals(e.getMessage(), "Request index 15 is greater than lastAppliedIndex: 11");
+ catch (LogIndexOutOfBoundsException e) {
+ assertEquals("Request index 15 is greater than lastAppliedIndex: 11", e.getMessage());
}
// index == 0 invalid request
@@ -2900,8 +2820,8 @@ public class ITNodeTest {
assertNull(leader.readCommittedUserLog(0));
fail();
}
- catch (final LogIndexOutOfBoundsException e) {
- assertEquals(e.getMessage(), "Request index is invalid: 0");
+ catch (LogIndexOutOfBoundsException e) {
+ assertEquals("Request index is invalid: 0", e.getMessage());
}
LOG.info("Trigger leader snapshot");
CountDownLatch latch = new CountDownLatch(1);
@@ -2909,9 +2829,9 @@ public class ITNodeTest {
waitLatch(latch);
// remove and add a peer to add two CONFIGURATION logs
- final List<Node> followers = cluster.getFollowers();
+ List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
- final Node testFollower = followers.get(0);
+ Node testFollower = followers.get(0);
latch = new CountDownLatch(1);
leader.removePeer(testFollower.getNodeId().getPeerId(), new ExpectClosure(latch));
waitLatch(latch);
@@ -2919,7 +2839,7 @@ public class ITNodeTest {
leader.addPeer(testFollower.getNodeId().getPeerId(), new ExpectClosure(latch));
waitLatch(latch);
- this.sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
+ sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
// trigger leader snapshot for the second time, after this the log of index 1~11 will be deleted.
LOG.info("Trigger leader snapshot");
@@ -2933,7 +2853,7 @@ public class ITNodeTest {
leader.readCommittedUserLog(5);
fail();
}
- catch (final LogNotFoundException e) {
+ catch (LogNotFoundException e) {
assertEquals("User log is deleted at index: 5", e.getMessage());
}
@@ -2951,40 +2871,38 @@ public class ITNodeTest {
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
- for (final MockStateMachine fsm : cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms()) {
assertEquals(20, fsm.getLogs().size());
- for (int i = 0; i < 20; i++) {
+ for (int i = 0; i < 20; i++)
assertEquals("hello" + i, new String(fsm.getLogs().get(i).array()));
- }
}
}
@Test
public void testBootStrapWithSnapshot() throws Exception {
- final Endpoint addr = JRaftUtils.getEndPoint("127.0.0.1:5006");
- final MockStateMachine fsm = new MockStateMachine(addr);
+ Endpoint addr = JRaftUtils.getEndPoint("127.0.0.1:5006");
+ MockStateMachine fsm = new MockStateMachine(addr);
- for (char ch = 'a'; ch <= 'z'; ch++) {
+ for (char ch = 'a'; ch <= 'z'; ch++)
fsm.getLogs().add(ByteBuffer.wrap(new byte[] {(byte) ch}));
- }
- final BootstrapOptions opts = new BootstrapOptions();
+ BootstrapOptions opts = new BootstrapOptions();
opts.setServiceFactory(new DefaultJRaftServiceFactory());
opts.setLastLogIndex(fsm.getLogs().size());
- opts.setRaftMetaUri(this.dataPath + File.separator + "meta");
- opts.setLogUri(this.dataPath + File.separator + "log");
- opts.setSnapshotUri(this.dataPath + File.separator + "snapshot");
+ opts.setRaftMetaUri(dataPath + File.separator + "meta");
+ opts.setLogUri(dataPath + File.separator + "log");
+ opts.setSnapshotUri(dataPath + File.separator + "snapshot");
opts.setGroupConf(JRaftUtils.getConfiguration("127.0.0.1:5006"));
opts.setFsm(fsm);
- final NodeOptions nodeOpts = createNodeOptions();
+ NodeOptions nodeOpts = createNodeOptions();
opts.setNodeOptions(nodeOpts);
assertTrue(JRaftUtils.bootstrap(opts));
- nodeOpts.setRaftMetaUri(this.dataPath + File.separator + "meta");
- nodeOpts.setLogUri(this.dataPath + File.separator + "log");
- nodeOpts.setSnapshotUri(this.dataPath + File.separator + "snapshot");
+ nodeOpts.setRaftMetaUri(dataPath + File.separator + "meta");
+ nodeOpts.setLogUri(dataPath + File.separator + "log");
+ nodeOpts.setSnapshotUri(dataPath + File.separator + "snapshot");
nodeOpts.setFsm(fsm);
RaftGroupService service = createService("test", new PeerId(addr, 0), nodeOpts);
@@ -2992,15 +2910,13 @@ public class ITNodeTest {
Node node = service.start(true);
assertEquals(26, fsm.getLogs().size());
- for (int i = 0; i < 26; i++) {
+ for (int i = 0; i < 26; i++)
assertEquals('a' + i, fsm.getLogs().get(i).get());
- }
// Group configuration will be restored from snapshot meta.
- while (!node.isLeader()) {
+ while (!node.isLeader())
Thread.sleep(20);
- }
- this.sendTestTaskAndWait(node);
+ sendTestTaskAndWait(node);
assertEquals(36, fsm.getLogs().size());
}
finally {
@@ -3010,34 +2926,33 @@ public class ITNodeTest {
@Test
public void testBootStrapWithoutSnapshot() throws Exception {
- final Endpoint addr = JRaftUtils.getEndPoint("127.0.0.1:5006");
- final MockStateMachine fsm = new MockStateMachine(addr);
+ Endpoint addr = JRaftUtils.getEndPoint("127.0.0.1:5006");
+ MockStateMachine fsm = new MockStateMachine(addr);
- final BootstrapOptions opts = new BootstrapOptions();
+ BootstrapOptions opts = new BootstrapOptions();
opts.setServiceFactory(new DefaultJRaftServiceFactory());
opts.setLastLogIndex(0);
- opts.setRaftMetaUri(this.dataPath + File.separator + "meta");
- opts.setLogUri(this.dataPath + File.separator + "log");
- opts.setSnapshotUri(this.dataPath + File.separator + "snapshot");
+ opts.setRaftMetaUri(dataPath + File.separator + "meta");
+ opts.setLogUri(dataPath + File.separator + "log");
+ opts.setSnapshotUri(dataPath + File.separator + "snapshot");
opts.setGroupConf(JRaftUtils.getConfiguration("127.0.0.1:5006"));
opts.setFsm(fsm);
- final NodeOptions nodeOpts = createNodeOptions();
+ NodeOptions nodeOpts = createNodeOptions();
opts.setNodeOptions(nodeOpts);
assertTrue(JRaftUtils.bootstrap(opts));
- nodeOpts.setRaftMetaUri(this.dataPath + File.separator + "meta");
- nodeOpts.setLogUri(this.dataPath + File.separator + "log");
- nodeOpts.setSnapshotUri(this.dataPath + File.separator + "snapshot");
+ nodeOpts.setRaftMetaUri(dataPath + File.separator + "meta");
+ nodeOpts.setLogUri(dataPath + File.separator + "log");
+ nodeOpts.setSnapshotUri(dataPath + File.separator + "snapshot");
nodeOpts.setFsm(fsm);
RaftGroupService service = createService("test", new PeerId(addr, 0), nodeOpts);
try {
Node node = service.start(true);
- while (!node.isLeader()) {
+ while (!node.isLeader())
Thread.sleep(20);
- }
- this.sendTestTaskAndWait(node);
+ sendTestTaskAndWait(node);
assertEquals(10, fsm.getLogs().size());
}
finally {
@@ -3046,65 +2961,65 @@ public class ITNodeTest {
}
@Test
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-14852")
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-14852")
public void testChangePeers() throws Exception {
- final PeerId peer0 = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT);
- cluster = new TestCluster("testChangePeers", this.dataPath, Collections.singletonList(peer0));
+ PeerId peer0 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
+ cluster = new TestCluster("testChangePeers", dataPath, Collections.singletonList(peer0));
assertTrue(cluster.start(peer0.getEndpoint()));
cluster.waitLeader();
Node leader = cluster.getLeader();
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
for (int i = 1; i < 10; i++) {
- final PeerId peer = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + i);
+ PeerId peer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + i);
assertTrue(cluster.start(peer.getEndpoint(), false, 300));
}
for (int i = 0; i < 9; i++) {
cluster.waitLeader();
leader = cluster.getLeader();
assertNotNull(leader);
- PeerId peer = new PeerId(TestUtils.getMyIp(), peer0.getEndpoint().getPort() + i);
- Assert.assertEquals(peer, leader.getNodeId().getPeerId());
- peer = new PeerId(TestUtils.getMyIp(), peer0.getEndpoint().getPort() + i + 1);
- final SynchronizedClosure done = new SynchronizedClosure();
+ PeerId peer = new PeerId(TestUtils.getLocalAddress(), peer0.getEndpoint().getPort() + i);
+ assertEquals(peer, leader.getNodeId().getPeerId());
+ peer = new PeerId(TestUtils.getLocalAddress(), peer0.getEndpoint().getPort() + i + 1);
+ SynchronizedClosure done = new SynchronizedClosure();
leader.changePeers(new Configuration(Collections.singletonList(peer)), done);
Status status = done.await();
- assertTrue(status.getRaftError().toString(), status.isOk());
+ assertTrue(status.isOk(), status.getRaftError().toString());
}
cluster.ensureSame();
}
@Test
public void testChangePeersAddMultiNodes() throws Exception {
- final PeerId peer0 = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT);
- cluster = new TestCluster("testChangePeers", this.dataPath, Collections.singletonList(peer0));
+ PeerId peer0 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
+ cluster = new TestCluster("testChangePeers", dataPath, Collections.singletonList(peer0));
assertTrue(cluster.start(peer0.getEndpoint()));
cluster.waitLeader();
- final Node leader = cluster.getLeader();
- this.sendTestTaskAndWait(leader);
+ Node leader = cluster.getLeader();
+ sendTestTaskAndWait(leader);
- final Configuration conf = new Configuration();
+ Configuration conf = new Configuration();
for (int i = 0; i < 3; i++) {
- final PeerId peer = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + i);
+ PeerId peer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + i);
conf.addPeer(peer);
}
- PeerId peer = new PeerId(TestUtils.getMyIp(), peer0.getEndpoint().getPort() + 1);
+ PeerId peer = new PeerId(TestUtils.getLocalAddress(), peer0.getEndpoint().getPort() + 1);
// fail, because the peers are not started.
- final SynchronizedClosure done = new SynchronizedClosure();
+ SynchronizedClosure done = new SynchronizedClosure();
leader.changePeers(new Configuration(Collections.singletonList(peer)), done);
- Assert.assertEquals(RaftError.ECATCHUP, done.await().getRaftError());
+ assertEquals(RaftError.ECATCHUP, done.await().getRaftError());
// start peer1
assertTrue(cluster.start(peer.getEndpoint()));
// still fail, because peer2 is not started
done.reset();
leader.changePeers(conf, done);
- Assert.assertEquals(RaftError.ECATCHUP, done.await().getRaftError());
+ assertEquals(RaftError.ECATCHUP, done.await().getRaftError());
// start peer2
- peer = new PeerId(TestUtils.getMyIp(), peer0.getEndpoint().getPort() + 2);
+ peer = new PeerId(TestUtils.getLocalAddress(), peer0.getEndpoint().getPort() + 2);
assertTrue(cluster.start(peer.getEndpoint()));
done.reset();
// works
@@ -3113,29 +3028,28 @@ public class ITNodeTest {
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
- for (final MockStateMachine fsm : cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms())
assertEquals(10, fsm.getLogs().size());
- }
}
@Test
public void testChangePeersStepsDownInJointConsensus() throws Exception {
- final List<PeerId> peers = new ArrayList<>();
+ List<PeerId> peers = new ArrayList<>();
- final PeerId peer0 = JRaftUtils.getPeerId(TestUtils.getMyIp() + ":5006");
- final PeerId peer1 = JRaftUtils.getPeerId(TestUtils.getMyIp() + ":5007");
- final PeerId peer2 = JRaftUtils.getPeerId(TestUtils.getMyIp() + ":5008");
- final PeerId peer3 = JRaftUtils.getPeerId(TestUtils.getMyIp() + ":5009");
+ PeerId peer0 = JRaftUtils.getPeerId(TestUtils.getLocalAddress() + ":5006");
+ PeerId peer1 = JRaftUtils.getPeerId(TestUtils.getLocalAddress() + ":5007");
+ PeerId peer2 = JRaftUtils.getPeerId(TestUtils.getLocalAddress() + ":5008");
+ PeerId peer3 = JRaftUtils.getPeerId(TestUtils.getLocalAddress() + ":5009");
// start single cluster
peers.add(peer0);
- cluster = new TestCluster("testChangePeersStepsDownInJointConsensus", this.dataPath, peers);
+ cluster = new TestCluster("testChangePeersStepsDownInJointConsensus", dataPath, peers);
assertTrue(cluster.start(peer0.getEndpoint()));
cluster.waitLeader();
Node leader = cluster.getLeader();
assertNotNull(leader);
- this.sendTestTaskAndWait(leader);
+ sendTestTaskAndWait(leader);
// start peer1-3
assertTrue(cluster.start(peer1.getEndpoint()));
@@ -3145,14 +3059,14 @@ public class ITNodeTest {
// Make sure the topology is ready before adding peers.
assertTrue(waitForTopology(cluster, leader.getNodeId().getPeerId().getEndpoint(), 4, 3_000));
- final Configuration conf = new Configuration();
+ Configuration conf = new Configuration();
conf.addPeer(peer0);
conf.addPeer(peer1);
conf.addPeer(peer2);
conf.addPeer(peer3);
// change peers
- final SynchronizedClosure done = new SynchronizedClosure();
+ SynchronizedClosure done = new SynchronizedClosure();
leader.changePeers(conf, done);
assertTrue(done.await().isOk());
@@ -3165,7 +3079,7 @@ public class ITNodeTest {
// Change peers to [peer2, peer3], which must fail since peer3 is stopped
done.reset();
leader.changePeers(conf, done);
- Assert.assertEquals(RaftError.EPERM, done.await().getRaftError());
+ assertEquals(RaftError.EPERM, done.await().getRaftError());
LOG.info(done.getStatus().toString());
assertFalse(((NodeImpl) leader).getConf().isStable());
@@ -3177,7 +3091,7 @@ public class ITNodeTest {
Thread.sleep(1000);
cluster.waitLeader();
leader = cluster.getLeader();
- final List<PeerId> thePeers = leader.listPeers();
+ List<PeerId> thePeers = leader.listPeers();
assertTrue(!thePeers.isEmpty());
assertEquals(conf.getPeerSet(), new HashSet<>(thePeers));
}
@@ -3188,8 +3102,8 @@ public class ITNodeTest {
volatile boolean stop;
boolean dontRemoveFirstPeer;
- ChangeArg(final TestCluster c, final List<PeerId> peers, final boolean stop,
- final boolean dontRemoveFirstPeer) {
+ ChangeArg(TestCluster c, List<PeerId> peers, boolean stop,
+ boolean dontRemoveFirstPeer) {
super();
this.c = c;
this.peers = peers;
@@ -3199,8 +3113,8 @@ public class ITNodeTest {
}
- private Future<?> startChangePeersThread(final ChangeArg arg) {
- final Set<RaftError> expectedErrors = new HashSet<>();
+ private Future<?> startChangePeersThread(ChangeArg arg) {
+ Set<RaftError> expectedErrors = new HashSet<>();
expectedErrors.add(RaftError.EBUSY);
expectedErrors.add(RaftError.EPERM);
expectedErrors.add(RaftError.ECATCHUP);
@@ -3211,33 +3125,29 @@ public class ITNodeTest {
try {
while (!arg.stop) {
arg.c.waitLeader();
- final Node leader = arg.c.getLeader();
- if (leader == null) {
+ Node leader = arg.c.getLeader();
+ if (leader == null)
continue;
- }
// select peers in random
- final Configuration conf = new Configuration();
- if (arg.dontRemoveFirstPeer) {
+ Configuration conf = new Configuration();
+ if (arg.dontRemoveFirstPeer)
conf.addPeer(arg.peers.get(0));
- }
for (int i = 0; i < arg.peers.size(); i++) {
- final boolean select = ThreadLocalRandom.current().nextInt(64) < 32;
- if (select && !conf.contains(arg.peers.get(i))) {
+ boolean select = ThreadLocalRandom.current().nextInt(64) < 32;
+ if (select && !conf.contains(arg.peers.get(i)))
conf.addPeer(arg.peers.get(i));
- }
}
if (conf.isEmpty()) {
LOG.warn("No peer has been selected");
continue;
}
- final SynchronizedClosure done = new SynchronizedClosure();
+ SynchronizedClosure done = new SynchronizedClosure();
leader.changePeers(conf, done);
done.await();
- assertTrue(done.getStatus().toString(),
- done.getStatus().isOk() || expectedErrors.contains(done.getStatus().getRaftError()));
+ assertTrue(done.getStatus().isOk() || expectedErrors.contains(done.getStatus().getRaftError()), done.getStatus().toString());
}
}
- catch (final InterruptedException e) {
+ catch (InterruptedException e) {
LOG.error("ChangePeersThread is interrupted", e);
}
});
@@ -3246,101 +3156,94 @@ public class ITNodeTest {
@Test
public void testChangePeersChaosWithSnapshot() throws Exception {
// start cluster
- final List<PeerId> peers = new ArrayList<>();
- peers.add(new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT));
- cluster = new TestCluster("unittest", this.dataPath, peers, 1000);
+ List<PeerId> peers = new ArrayList<>();
+ peers.add(new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT));
+ cluster = new TestCluster("unittest", dataPath, peers, 1000);
assertTrue(cluster.start(peers.get(0).getEndpoint(), false, 2));
// start other peers
for (int i = 1; i < 10; i++) {
- final PeerId peer = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + i);
+ PeerId peer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + i);
peers.add(peer);
assertTrue(cluster.start(peer.getEndpoint()));
}
- final ChangeArg arg = new ChangeArg(cluster, peers, false, false);
+ ChangeArg arg = new ChangeArg(cluster, peers, false, false);
- final Future<?> future = startChangePeersThread(arg);
+ Future<?> future = startChangePeersThread(arg);
for (int i = 0; i < 5000; ) {
cluster.waitLeader();
- final Node leader = cluster.getLeader();
- if (leader == null) {
+ Node leader = cluster.getLeader();
+ if (leader == null)
continue;
- }
- final SynchronizedClosure done = new SynchronizedClosure();
- final Task task = new Task(ByteBuffer.wrap(("hello" + i).getBytes()), done);
+ SynchronizedClosure done = new SynchronizedClosure();
+ Task task = new Task(ByteBuffer.wrap(("hello" + i).getBytes()), done);
leader.apply(task);
- final Status status = done.await();
+ Status status = done.await();
if (status.isOk()) {
- if (++i % 100 == 0) {
+ if (++i % 100 == 0)
System.out.println("Progress:" + i);
- }
}
- else {
+ else
assertEquals(RaftError.EPERM, status.getRaftError());
- }
}
arg.stop = true;
future.get();
cluster.waitLeader();
- final SynchronizedClosure done = new SynchronizedClosure();
- final Node leader = cluster.getLeader();
+ SynchronizedClosure done = new SynchronizedClosure();
+ Node leader = cluster.getLeader();
leader.changePeers(new Configuration(peers), done);
- final Status st = done.await();
- assertTrue(st.getErrorMsg(), st.isOk());
+ Status st = done.await();
+ assertTrue(st.isOk(), st.getErrorMsg());
cluster.ensureSame();
assertEquals(10, cluster.getFsms().size());
- for (final MockStateMachine fsm : cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms())
assertTrue(fsm.getLogs().size() >= 5000);
- }
}
@Test
public void testChangePeersChaosWithoutSnapshot() throws Exception {
// start cluster
- final List<PeerId> peers = new ArrayList<>();
- peers.add(new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT));
- cluster = new TestCluster("unittest", this.dataPath, peers, 1000);
+ List<PeerId> peers = new ArrayList<>();
+ peers.add(new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT));
+ cluster = new TestCluster("unittest", dataPath, peers, 1000);
assertTrue(cluster.start(peers.get(0).getEndpoint(), false, 100000));
// start other peers
for (int i = 1; i < 10; i++) {
- final PeerId peer = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + i);
+ PeerId peer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + i);
peers.add(peer);
assertTrue(cluster.start(peer.getEndpoint(), false, 10000));
}
- final ChangeArg arg = new ChangeArg(cluster, peers, false, true);
+ ChangeArg arg = new ChangeArg(cluster, peers, false, true);
- final Future<?> future = startChangePeersThread(arg);
+ Future<?> future = startChangePeersThread(arg);
final int tasks = 5000;
for (int i = 0; i < tasks; ) {
cluster.waitLeader();
- final Node leader = cluster.getLeader();
- if (leader == null) {
+ Node leader = cluster.getLeader();
+ if (leader == null)
continue;
- }
- final SynchronizedClosure done = new SynchronizedClosure();
- final Task task = new Task(ByteBuffer.wrap(("hello" + i).getBytes()), done);
+ SynchronizedClosure done = new SynchronizedClosure();
+ Task task = new Task(ByteBuffer.wrap(("hello" + i).getBytes()), done);
leader.apply(task);
- final Status status = done.await();
+ Status status = done.await();
if (status.isOk()) {
- if (++i % 100 == 0) {
+ if (++i % 100 == 0)
System.out.println("Progress:" + i);
- }
}
- else {
+ else
assertEquals(RaftError.EPERM, status.getRaftError());
- }
}
arg.stop = true;
future.get();
cluster.waitLeader();
- final SynchronizedClosure done = new SynchronizedClosure();
- final Node leader = cluster.getLeader();
+ SynchronizedClosure done = new SynchronizedClosure();
+ Node leader = cluster.getLeader();
leader.changePeers(new Configuration(peers), done);
assertTrue(done.await().isOk());
cluster.ensureSame();
assertEquals(10, cluster.getFsms().size());
- for (final MockStateMachine fsm : cluster.getFsms()) {
+ for (MockStateMachine fsm : cluster.getFsms()) {
assertTrue(fsm.getLogs().size() >= tasks);
assertTrue(fsm.getLogs().size() - tasks < 100);
}
@@ -3349,26 +3252,26 @@ public class ITNodeTest {
@Test
public void testChangePeersChaosApplyTasks() throws Exception {
// start cluster
- final List<PeerId> peers = new ArrayList<>();
- peers.add(new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT));
- cluster = new TestCluster("unittest", this.dataPath, peers, 1000);
+ List<PeerId> peers = new ArrayList<>();
+ peers.add(new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT));
+ cluster = new TestCluster("unittest", dataPath, peers, 1000);
assertTrue(cluster.start(peers.get(0).getEndpoint(), false, 100000));
// start other peers
for (int i = 1; i < 10; i++) {
- final PeerId peer = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + i);
+ PeerId peer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + i);
peers.add(peer);
assertTrue(cluster.start(peer.getEndpoint(), false, 100000));
}
final int threads = 3;
- final List<ChangeArg> args = new ArrayList<>();
- final List<Future<?>> futures = new ArrayList<>();
- final CountDownLatch latch = new CountDownLatch(threads);
+ List<ChangeArg> args = new ArrayList<>();
+ List<Future<?>> futures = new ArrayList<>();
+ CountDownLatch latch = new CountDownLatch(threads);
Executor executor = Executors.newFixedThreadPool(threads);
for (int t = 0; t < threads; t++) {
- final ChangeArg arg = new ChangeArg(cluster, peers, false, true);
+ ChangeArg arg = new ChangeArg(cluster, peers, false, true);
args.add(arg);
futures.add(startChangePeersThread(arg));
@@ -3376,25 +3279,22 @@ public class ITNodeTest {
try {
for (int i = 0; i < 5000; ) {
cluster.waitLeader();
- final Node leader = cluster.getLeader();
- if (leader == null) {
+ Node leader = cluster.getLeader();
+ if (leader == null)
continue;
- }
- final SynchronizedClosure done = new SynchronizedClosure();
- final Task task = new Task(ByteBuffer.wrap(("hello" + i).getBytes()), done);
+ SynchronizedClosure done = new SynchronizedClosure();
+ Task task = new Task(ByteBuffer.wrap(("hello" + i).getBytes()), done);
leader.apply(task);
- final Status status = done.await();
+ Status status = done.await();
if (status.isOk()) {
- if (++i % 100 == 0) {
+ if (++i % 100 == 0)
System.out.println("Progress:" + i);
- }
}
- else {
+ else
assertEquals(RaftError.EPERM, status.getRaftError());
- }
}
}
- catch (final Exception e) {
+ catch (Exception e) {
LOG.error("Failed to run tasks", e);
}
finally {
@@ -3404,36 +3304,33 @@ public class ITNodeTest {
}
latch.await();
- for (final ChangeArg arg : args) {
+ for (ChangeArg arg : args)
arg.stop = true;
- }
- for (final Future<?> future : futures) {
+ for (Future<?> future : futures)
future.get();
- }
cluster.waitLeader();
- final SynchronizedClosure done = new SynchronizedClosure();
- final Node leader = cluster.getLeader();
+ SynchronizedClosure done = new SynchronizedClosure();
+ Node leader = cluster.getLeader();
leader.changePeers(new Configuration(peers), done);
assertTrue(done.await().isOk());
cluster.ensureSame();
assertEquals(10, cluster.getFsms().size());
- for (final MockStateMachine fsm : cluster.getFsms()) {
- final int logSize = fsm.getLogs().size();
- assertTrue("logSize= " + logSize, logSize >= 5000 * threads);
- assertTrue("logSize= " + logSize, logSize - 5000 * threads < 100);
+ for (MockStateMachine fsm : cluster.getFsms()) {
+ int logSize = fsm.getLogs().size();
+ assertTrue(logSize >= 5000 * threads, "logSize= " + logSize);
+ assertTrue(logSize - 5000 * threads < 100, "logSize= " + logSize);
}
}
@Test
public void testBlockedElection() throws Exception {
- final List<PeerId> peers = TestUtils.generatePeers(3);
- cluster = new TestCluster("unittest", this.dataPath, peers);
+ List<PeerId> peers = TestUtils.generatePeers(3);
+ cluster = new TestCluster("unittest", dataPath, peers);
- for (final PeerId peer : peers) {
+ for (PeerId peer : peers)
assertTrue(cluster.start(peer.getEndpoint()));
- }
cluster.waitLeader();
@@ -3447,10 +3344,10 @@ public class ITNodeTest {
NodeImpl follower0 = (NodeImpl) follower;
DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService();
RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient();
- rpcClientEx.blockMessages(new BiPredicate<Object, String>() {
+ rpcClientEx.blockMessages(new BiPredicate<>() {
@Override public boolean test(Object msg, String nodeId) {
if (msg instanceof RpcRequests.RequestVoteRequest) {
- RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest) msg;
+ RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest)msg;
return !msg0.getPreVote();
}
@@ -3484,7 +3381,7 @@ public class ITNodeTest {
}
private NodeOptions createNodeOptions() {
- final NodeOptions options = new NodeOptions();
+ NodeOptions options = new NodeOptions();
options.setCommonExecutor(JRaftUtils.createCommonExecutor(options));
options.setStripedExecutor(JRaftUtils.createAppendEntriesExecutor(options));
@@ -3563,16 +3460,14 @@ public class ITNodeTest {
Configuration initialConf = nodeOptions.getInitialConf();
if (initialConf != null) {
- for (PeerId id : initialConf.getPeers()) {
+ for (PeerId id : initialConf.getPeers())
servers.add(id.getEndpoint().toString());
- }
- for (PeerId id : initialConf.getLearners()) {
+ for (PeerId id : initialConf.getLearners())
servers.add(id.getEndpoint().toString());
- }
}
- final IgniteRpcServer rpcServer = new TestIgniteRpcServer(peerId.getEndpoint(), servers, nodeManager);
+ IgniteRpcServer rpcServer = new TestIgniteRpcServer(peerId.getEndpoint(), servers, nodeManager);
nodeOptions.setRpcClient(new IgniteRpcClient(rpcServer.clusterService(), true));
return new RaftGroupService(groupId, peerId, nodeOptions, rpcServer, nodeManager);
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
index b639566..06c949b 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
@@ -25,11 +25,10 @@ import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.ClusterServiceFactory;
-import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.exception.RaftException;
@@ -39,19 +38,18 @@ import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Utils;
-import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
+import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
-import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
@@ -68,9 +66,6 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
*/
private static final RaftClientMessagesFactory FACTORY = new RaftClientMessagesFactory();
- /** Network factory. */
- private static final ClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
-
/**
* Counter group name 0.
*/
@@ -95,14 +90,14 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
* Initial configuration.
*/
private static final List<Peer> INITIAL_CONF = List.of(
- new Peer(TestUtils.getMyIp() + ":" + PORT),
- new Peer(TestUtils.getMyIp() + ":" + (PORT + 1)),
- new Peer(TestUtils.getMyIp() + ":" + (PORT + 2)));
+ new Peer(getLocalAddress() + ":" + PORT),
+ new Peer(getLocalAddress() + ":" + (PORT + 1)),
+ new Peer(getLocalAddress() + ":" + (PORT + 2)));
/**
* Listener factory.
*/
- private Supplier<CounterListener> listenerFactory = () -> new CounterListener();
+ private Supplier<CounterListener> listenerFactory = CounterListener::new;
/**
* Servers list.
@@ -119,27 +114,28 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
*/
private String dataPath;
+ /** */
@BeforeEach
void before(TestInfo testInfo) {
LOG.info(">>>>>>>>>>>>>>> Start test method: " + testInfo.getTestMethod().orElseThrow().getName());
- this.dataPath = TestUtils.mkTempDir();
+ dataPath = TestUtils.mkTempDir();
}
+ /** */
@AfterEach
void after(TestInfo testInfo) throws Exception {
LOG.info("Start client shutdown");
- for (RaftGroupService client : clients) {
+ for (RaftGroupService client : clients)
client.shutdown();
- }
LOG.info("Start server shutdown servers={}", servers.size());
for (RaftServer server : servers)
server.shutdown();
- assertTrue("Failed to delete " + this.dataPath, Utils.delete(new File(this.dataPath)));
+ assertTrue(Utils.delete(new File(dataPath)), "Failed to delete " + dataPath);
LOG.info(">>>>>>>>>>>>>>> End test method: " + testInfo.getTestMethod().orElseThrow().getName());
}
@@ -148,9 +144,9 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
* @param idx The index.
* @return Raft server instance.
*/
- protected JRaftServerImpl startServer(int idx, Consumer<RaftServer> clo) {
+ private JRaftServerImpl startServer(int idx, Consumer<RaftServer> clo) {
ClusterService service = clusterService("server" + idx, PORT + idx,
- List.of(TestUtils.getMyIp() + ":" + PORT), false);
+ List.of(getLocalAddress() + ":" + PORT), false);
JRaftServerImpl server = new JRaftServerImpl(service, dataPath, FACTORY, false);
@@ -167,8 +163,8 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
* @param groupId Group id.
* @return The client.
*/
- protected RaftGroupService startClient(String groupId) {
- String addr = TestUtils.getMyIp() + ":" + PORT;
+ private RaftGroupService startClient(String groupId) {
+ String addr = getLocalAddress() + ":" + PORT;
ClusterService clientNode1 = clusterService("client_" + groupId + "_", CLIENT_PORT + clients.size(),
List.of(addr), false);
@@ -186,11 +182,9 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
*/
private void startCluster() {
for (int i = 0; i < 3; i++) {
- startServer(i, new Consumer<RaftServer>() {
- @Override public void accept(RaftServer raftServer) {
- raftServer.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF);
- raftServer.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF);
- }
+ startServer(i, raftServer -> {
+ raftServer.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF);
+ raftServer.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF);
});
}
@@ -310,7 +304,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
fail();
}
catch (Exception e) {
- Assertions.assertTrue(e.getCause() instanceof RaftException);
+ assertTrue(e.getCause() instanceof RaftException);
}
}
@@ -342,7 +336,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
fail();
}
catch (Exception e) {
- Assertions.assertTrue(e.getCause() instanceof RaftException);
+ assertTrue(e.getCause() instanceof RaftException);
}
}
@@ -398,14 +392,14 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
// Expected.
Throwable cause = e.getCause();
- Assertions.assertTrue(cause instanceof RaftException);
+ assertTrue(cause instanceof RaftException);
}
try {
client1.<Long>run(new IncrementAndGetCommand(11)).get();
}
catch (Exception e) {
- Assertions.assertTrue(e.getCause() instanceof TimeoutException, "New leader should not get elected");
+ assertTrue(e.getCause() instanceof TimeoutException, "New leader should not get elected");
}
}
@@ -526,7 +520,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
* @return The counter value.
* @throws Exception If failed.
*/
- private long applyIncrements(RaftGroupService client, int start, int stop) throws Exception {
+ private static long applyIncrements(RaftGroupService client, int start, int stop) throws Exception {
long val = 0;
for (int i = start; i <= stop; i++) {
@@ -544,7 +538,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
* @param until Until value.
* @return The sum.
*/
- public long sum(long until) {
+ private static long sum(long until) {
return (1 + until) * until / 2;
}
@@ -554,7 +548,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
* @param groupId Group id.
* @return Validation result.
*/
- private boolean validateStateMachine(long expected, JRaftServerImpl server, String groupId) {
+ private static boolean validateStateMachine(long expected, JRaftServerImpl server, String groupId) {
org.apache.ignite.raft.jraft.RaftGroupService svc = server.raftGroupService(groupId);
JRaftServerImpl.DelegatingStateMachine fsm0 =
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
index 278c03d..01cd57f 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
@@ -16,8 +16,6 @@
*/
package org.apache.ignite.raft.jraft.rpc;
-import java.net.Inet4Address;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
@@ -28,6 +26,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.Utils;
import org.junit.jupiter.api.AfterEach;
@@ -52,8 +51,8 @@ public abstract class AbstractRpcTest {
private final List<RpcClient> clients = new ArrayList<>();
@BeforeEach
- public void setup() throws UnknownHostException {
- endpoint = new Endpoint(Inet4Address.getLocalHost().getHostAddress(), INIT_PORT);
+ public void setup() {
+ endpoint = new Endpoint(TestUtils.getLocalAddress(), INIT_PORT);
RpcServer<?> server = createServer(endpoint);
server.registerProcessor(new Request1RpcProcessor());
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
index ed101d6..001d1bd 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
@@ -19,18 +19,16 @@ package org.apache.ignite.raft.jraft.test;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
-import java.net.Inet4Address;
import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BooleanSupplier;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
@@ -47,7 +45,7 @@ import static java.lang.Thread.sleep;
* Test helper
*/
public class TestUtils {
- public static ConfigurationEntry getConfEntry(final String confStr, final String oldConfStr) {
+ public static ConfigurationEntry getConfEntry(String confStr, String oldConfStr) {
ConfigurationEntry entry = new ConfigurationEntry();
entry.setConf(JRaftUtils.getConfiguration(confStr));
entry.setOldConf(JRaftUtils.getConfiguration(oldConfStr));
@@ -57,9 +55,8 @@ public class TestUtils {
public static void dumpThreads() {
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
ThreadInfo[] infos = bean.dumpAllThreads(true, true);
- for (ThreadInfo info : infos) {
+ for (ThreadInfo info : infos)
System.out.println(info);
- }
}
public static String mkTempDir() {
@@ -70,11 +67,11 @@ public class TestUtils {
return path.toString();
}
- public static LogEntry mockEntry(final int index, final int term) {
+ public static LogEntry mockEntry(int index, int term) {
return mockEntry(index, term, 0);
}
- public static LogEntry mockEntry(final int index, final int term, final int dataSize) {
+ public static LogEntry mockEntry(int index, int term, int dataSize) {
LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_NO_OP);
entry.setId(new LogId(index, term));
if (dataSize > 0) {
@@ -89,65 +86,50 @@ public class TestUtils {
return mockEntries(10);
}
- public static String getMyIp() {
- String ip = null;
+ /**
+ * Returns the localhost IP address.
+ *
+ * @return localhost IP address
+ */
+ public static String getLocalAddress() {
try {
- Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
- while (interfaces.hasMoreElements()) {
- NetworkInterface iface = interfaces.nextElement();
- // filters out 127.0.0.1 and inactive interfaces
- if (iface.isLoopback() || !iface.isUp()) {
- continue;
- }
-
- Enumeration<InetAddress> addresses = iface.getInetAddresses();
- while (addresses.hasMoreElements()) {
- InetAddress addr = addresses.nextElement();
- if (addr instanceof Inet4Address) {
- ip = addr.getHostAddress();
- break;
- }
- }
- }
- return ip;
+ return InetAddress.getLocalHost().getHostAddress();
}
- catch (SocketException e) {
- return "localhost";
+ catch (UnknownHostException e) {
+ throw new IgniteInternalException(e);
}
}
- public static List<LogEntry> mockEntries(final int n) {
+ public static List<LogEntry> mockEntries(int n) {
List<LogEntry> entries = new ArrayList<>();
for (int i = 0; i < n; i++) {
LogEntry entry = mockEntry(i, i);
- if (i > 0) {
+ if (i > 0)
entry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes()));
- }
entries.add(entry);
}
return entries;
}
public static RpcRequests.PingRequest createPingRequest() {
- RpcRequests.PingRequest reqObject = RpcRequests.PingRequest.newBuilder()
- .setSendTimestamp(System.currentTimeMillis()).build();
- return reqObject;
+ return RpcRequests.PingRequest.newBuilder()
+ .setSendTimestamp(System.currentTimeMillis())
+ .build();
}
public static final int INIT_PORT = 5003;
- public static List<PeerId> generatePeers(final int n) {
+ public static List<PeerId> generatePeers(int n) {
List<PeerId> ret = new ArrayList<>();
- for (int i = 0; i < n; i++) {
- ret.add(new PeerId(getMyIp(), INIT_PORT + i));
- }
+ for (int i = 0; i < n; i++)
+ ret.add(new PeerId(getLocalAddress(), INIT_PORT + i));
return ret;
}
- public static List<PeerId> generatePriorityPeers(final int n, final List<Integer> priorities) {
+ public static List<PeerId> generatePriorityPeers(int n, List<Integer> priorities) {
List<PeerId> ret = new ArrayList<>();
for (int i = 0; i < n; i++) {
- Endpoint endpoint = new Endpoint(getMyIp(), INIT_PORT + i);
+ Endpoint endpoint = new Endpoint(getLocalAddress(), INIT_PORT + i);
PeerId peerId = new PeerId(endpoint, 0, priorities.get(i));
ret.add(peerId);
}
@@ -155,7 +137,7 @@ public class TestUtils {
}
public static byte[] getRandomBytes() {
- final byte[] requestContext = new byte[ThreadLocalRandom.current().nextInt(10) + 1];
+ byte[] requestContext = new byte[ThreadLocalRandom.current().nextInt(10) + 1];
ThreadLocalRandom.current().nextBytes(requestContext);
return requestContext;
}