You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/06/21 13:04:39 UTC
[ignite-3] branch main updated: IGNITE-14853 Stabilize flaky tests
in jraft module - Fixes #176.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7d42f95 IGNITE-14853 Stabilize flaky tests in jraft module - Fixes #176.
7d42f95 is described below
commit 7d42f95c19c5642d8e00a664aec154a2faa5a357
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Mon Jun 21 16:01:24 2021 +0300
IGNITE-14853 Stabilize flaky tests in jraft module - Fixes #176.
Signed-off-by: Alexey Scherbakov <al...@gmail.com>
---
.../internal/metastorage/MetaStorageManager.java | 2 +-
.../apache/ignite/network/MessagingService.java | 10 +
.../ignite/network/NetworkMessageHandler.java | 5 +-
.../scalecube/ITScaleCubeNetworkMessagingTest.java | 13 +-
.../scalecube/ScaleCubeClusterServiceFactory.java | 2 +-
.../scalecube/ScaleCubeMessagingService.java | 28 +-
.../client/service/impl/RaftGroupServiceImpl.java | 1 +
.../ignite/raft/jraft/core/ITCliServiceTest.java | 53 ++--
.../apache/ignite/raft/jraft/core/ITNodeTest.java | 297 ++++++++++++---------
.../raft/server/ITJRaftCounterServerTest.java | 21 +-
.../internal/raft/server/impl/JRaftServerImpl.java | 2 +-
.../internal/raft/server/impl/RaftServerImpl.java | 15 +-
.../apache/ignite/raft/jraft/RaftGroupService.java | 37 +--
.../apache/ignite/raft/jraft/core/NodeImpl.java | 11 +-
.../ignite/raft/jraft/error/RemotingException.java | 1 -
.../apache/ignite/raft/jraft/rpc/RpcClient.java | 52 +---
.../ignite/raft/jraft/rpc/RpcRequestClosure.java | 1 +
.../ignite/raft/jraft/rpc/RpcRequestProcessor.java | 1 +
.../raft/jraft/rpc/RpcResponseClosureAdapter.java | 4 -
.../raft/jraft/rpc/impl/AbstractClientService.java | 47 ++--
.../raft/jraft/rpc/impl/IgniteRpcClient.java | 63 +----
.../raft/jraft/rpc/impl/IgniteRpcServer.java | 8 +-
.../impl/core/AppendEntriesRequestProcessor.java | 6 +-
.../ignite/raft/jraft/util/internal/ThrowUtil.java | 60 +++--
.../apache/ignite/raft/jraft/core/TestCluster.java | 94 +++++--
.../raft/jraft/rpc/AbstractClientServiceTest.java | 66 +----
.../ignite/raft/jraft/rpc/AbstractRpcTest.java | 91 +------
.../ignite/raft/jraft/rpc/IgniteRpcTest.java | 4 +-
.../ignite/raft/jraft/rpc/TestIgniteRpcServer.java | 11 +-
.../snapshot/local/LocalSnapshotReaderTest.java | 9 +-
30 files changed, 465 insertions(+), 550 deletions(-)
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 4ded54b..7e1316f 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -167,7 +167,7 @@ public class MetaStorageManager {
// );
// TODO: IGNITE-14414 Cluster initialization flow. Here we should complete metaStorageServiceFuture.
- clusterNetSvc.messagingService().addMessageHandler((message, sender, correlationId) -> {});
+ clusterNetSvc.messagingService().addMessageHandler((message, senderAddr, correlationId) -> {});
}
/**
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
index a60222c..4312ab9 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
@@ -58,6 +58,16 @@ public interface MessagingService {
CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg, String correlationId);
/**
+ * Same as {@link #send(ClusterNode, NetworkMessage)} but attaches the given correlation ID to the given message.
+ *
+ * @param addr Recipient network address in host:port format.
+ * @param msg Message which should be delivered.
+ * @param correlationId Correlation id when replying to the request.
+ * @return Future of the send operation.
+ */
+ CompletableFuture<Void> send(String addr, NetworkMessage msg, String correlationId);
+
+ /**
* Sends a message asynchronously with same guarantees as {@link #send(ClusterNode, NetworkMessage)} and
* returns a future that will be completed successfully upon receiving a response.
*
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
index 47f4737..94e2e0b 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
@@ -22,8 +22,9 @@ package org.apache.ignite.network;
public interface NetworkMessageHandler {
/**
* @param message Message which was received from the cluster.
- * @param sender Sender.
+ * @param senderAddr Sender address. Use
+ * {@link TopologyService#getByAddress(String)} to resolve a cluster node.
* @param correlationId Correlation id.
*/
- void onReceived(NetworkMessage message, ClusterNode sender, String correlationId);
+ void onReceived(NetworkMessage message, String senderAddr, String correlationId);
}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index 074cd4e..28c1956 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -78,7 +78,7 @@ class ITScaleCubeNetworkMessagingTest {
for (ClusterService member : testCluster.members) {
member.messagingService().addMessageHandler(
- (message, sender, correlationId) -> {
+ (message, senderAddr, correlationId) -> {
messageStorage.put(member.localConfiguration().getName(), (TestMessage)message);
messageReceivedLatch.countDown();
}
@@ -137,11 +137,11 @@ class ITScaleCubeNetworkMessagingTest {
class Data {
private final TestMessage message;
- private final ClusterNode sender;
+ private final String sender;
private final String correlationId;
- private Data(TestMessage message, ClusterNode sender, String correlationId) {
+ private Data(TestMessage message, String sender, String correlationId) {
this.message = message;
this.sender = sender;
this.correlationId = correlationId;
@@ -151,7 +151,8 @@ class ITScaleCubeNetworkMessagingTest {
var dataFuture = new CompletableFuture<Data>();
member.messagingService().addMessageHandler(
- (message, sender, correlationId) -> dataFuture.complete(new Data((TestMessage)message, sender, correlationId))
+ (message, senderAddr, correlationId) ->
+ dataFuture.complete(new Data((TestMessage)message, senderAddr, correlationId))
);
var requestMessage = messageFactory.testMessage().msg("request").build();
@@ -162,7 +163,7 @@ class ITScaleCubeNetworkMessagingTest {
Data actualData = dataFuture.get(3, TimeUnit.SECONDS);
assertThat(actualData.message.msg(), is(requestMessage.msg()));
- assertThat(actualData.sender, is(self));
+ assertThat(actualData.sender, is(self.address()));
assertThat(actualData.correlationId, is(correlationId));
}
@@ -181,7 +182,7 @@ class ITScaleCubeNetworkMessagingTest {
var requestMessage = messageFactory.testMessage().msg("request").build();
var responseMessage = messageFactory.testMessage().msg("response").build();
- member.messagingService().addMessageHandler((message, sender, correlationId) -> {
+ member.messagingService().addMessageHandler((message, senderAddr, correlationId) -> {
if (message.equals(requestMessage))
member.messagingService().send(self, responseMessage, correlationId);
});
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index d7d2685..cc4c5d1 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -54,7 +54,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
var topologyService = new ScaleCubeTopologyService();
- var messagingService = new ScaleCubeMessagingService(topologyService);
+ var messagingService = new ScaleCubeMessagingService();
var messageFactory = new NetworkMessagesFactory();
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index 6bf365b..a0c75e7 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -39,20 +39,6 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
private Cluster cluster;
/**
- * Topology service.
- */
- private ScaleCubeTopologyService topologyService;
-
- /**
- * Constructor.
- *
- * @param topologyService Topology service.
- */
- ScaleCubeMessagingService(ScaleCubeTopologyService topologyService) {
- this.topologyService = topologyService;
- }
-
- /**
* Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency injection.
*
* @param cluster Cluster.
@@ -68,15 +54,11 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
*/
void fireEvent(Message message) {
NetworkMessage msg = message.data();
- ClusterNode sender = topologyService.getByAddress(message.header(Message.HEADER_SENDER));
-
- if (sender == null) // Ignore the message from the unknown node.
- return;
String correlationId = message.correlationId();
for (NetworkMessageHandler handler : getMessageHandlers())
- handler.onReceived(msg, sender, correlationId);
+ handler.onReceived(msg, message.header(Message.HEADER_SENDER), correlationId);
}
/** {@inheritDoc} */
@@ -95,13 +77,19 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
/** {@inheritDoc} */
@Override public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg, String correlationId) {
+ return send(recipient.address(), msg, correlationId);
+ }
+
+ @Override public CompletableFuture<Void> send(String addr, NetworkMessage msg, String correlationId) {
var message = Message
.withData(msg)
.correlationId(correlationId)
.build();
+ Address address = Address.from(addr);
+
return cluster
- .send(clusterNodeAddress(recipient), message)
+ .send(address, message)
.toFuture();
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
index 0d04eec..bfccb55 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
@@ -333,6 +333,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
/** {@inheritDoc} */
@Override public void shutdown() {
+ // No-op.
}
/**
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
index 05b8a56..a7730be 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
@@ -104,15 +104,7 @@ public class ITCliServiceTest {
cluster.startLearner(peer);
cluster.waitLeader();
-
- for (Node follower : cluster.getFollowers())
- assertTrue(waitForCondition(() -> follower.getLeaderId() != null, 3_000));
-
- for (PeerId learner : cluster.getLearners()) {
- Node node = cluster.getNode(learner.getEndpoint());
-
- assertTrue(waitForCondition(() -> node.getLeaderId() != null, 3_000));
- }
+ cluster.ensureLeader(cluster.getLeader());
cliService = new CliServiceImpl();
conf = new Configuration(peers, learners);
@@ -187,11 +179,13 @@ public class ITCliServiceTest {
PeerId learner3 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + LEARNER_PORT_STEP + 3);
assertTrue(cluster.startLearner(learner3));
sendTestTaskAndWait(cluster.getLeader(), 0);
- sleep(500);
+ cluster.ensureSame(addr -> addr.equals(learner3.getEndpoint()));
+
for (MockStateMachine fsm : cluster.getFsms()) {
if (!fsm.getAddress().equals(learner3.getEndpoint()))
assertEquals(10, fsm.getLogs().size());
}
+
assertEquals(0, cluster.getFsmByPeer(learner3).getLogs().size());
List<PeerId> oldLearners = new ArrayList<PeerId>(conf.getLearners());
assertEquals(oldLearners, cliService.getLearners(groupId, conf));
@@ -199,13 +193,16 @@ public class ITCliServiceTest {
// Add learner3
cliService.addLearners(groupId, conf, Collections.singletonList(learner3));
- sleep(1000);
- assertEquals(10, cluster.getFsmByPeer(learner3).getLogs().size());
+
+ assertTrue(waitForCondition(() -> cluster.getFsmByPeer(learner3).getLogs().size() == 10, 5_000));
sendTestTaskAndWait(cluster.getLeader(), 0);
- sleep(1000);
+
+ cluster.ensureSame();
+
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(20, fsm.getLogs().size());
+
List<PeerId> newLearners = new ArrayList<>(oldLearners);
newLearners.add(learner3);
assertEquals(newLearners, cliService.getLearners(groupId, conf));
@@ -214,11 +211,14 @@ public class ITCliServiceTest {
// Remove 3
cliService.removeLearners(groupId, conf, Collections.singletonList(learner3));
sendTestTaskAndWait(cluster.getLeader(), 0);
- sleep(1000);
+
+ cluster.ensureSame(addr -> addr.equals(learner3.getEndpoint()));
+
for (MockStateMachine fsm : cluster.getFsms()) {
if (!fsm.getAddress().equals(learner3.getEndpoint()))
assertEquals(30, fsm.getLogs().size());
}
+
// Latest 10 logs are not replicated to learner3, because it's removed.
assertEquals(20, cluster.getFsmByPeer(learner3).getLogs().size());
assertEquals(oldLearners, cliService.getLearners(groupId, conf));
@@ -226,11 +226,13 @@ public class ITCliServiceTest {
// Set learners into [learner3]
cliService.resetLearners(groupId, conf, Collections.singletonList(learner3));
- sleep(100);
- assertEquals(30, cluster.getFsmByPeer(learner3).getLogs().size());
+
+ assertTrue(waitForCondition(() -> cluster.getFsmByPeer(learner3).getLogs().size() == 30, 5_000));
sendTestTaskAndWait(cluster.getLeader(), 0);
- sleep(1000);
+
+ cluster.ensureSame(addr -> oldLearners.contains(new PeerId(addr, 0)));
+
// Latest 10 logs are not replicated to learner1 and learner2, because they were removed by resetting learners set.
for (MockStateMachine fsm : cluster.getFsms()) {
if (!oldLearners.contains(new PeerId(fsm.getAddress(), 0)))
@@ -238,6 +240,7 @@ public class ITCliServiceTest {
else
assertEquals(30, fsm.getLogs().size());
}
+
assertEquals(Collections.singletonList(learner3), cliService.getLearners(groupId, conf));
assertEquals(Collections.singletonList(learner3), cliService.getAliveLearners(groupId, conf));
@@ -253,15 +256,18 @@ public class ITCliServiceTest {
PeerId peer3 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 3);
assertTrue(cluster.start(peer3.getEndpoint()));
sendTestTaskAndWait(cluster.getLeader(), 0);
- sleep(100);
+ cluster.ensureSame(addr -> addr.equals(peer3.getEndpoint()));
assertEquals(0, cluster.getFsmByPeer(peer3).getLogs().size());
assertTrue(cliService.addPeer(groupId, conf, peer3).isOk());
- sleep(100);
- assertEquals(10, cluster.getFsmByPeer(peer3).getLogs().size());
+
+ assertTrue(waitForCondition(() -> cluster.getFsmByPeer(peer3).getLogs().size() == 10, 5_000));
sendTestTaskAndWait(cluster.getLeader(), 0);
- sleep(100);
+
assertEquals(6, cluster.getFsms().size());
+
+ cluster.ensureSame();
+
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(20, fsm.getLogs().size());
@@ -269,8 +275,11 @@ public class ITCliServiceTest {
assertTrue(cliService.removePeer(groupId, conf, peer3).isOk());
sleep(200);
sendTestTaskAndWait(cluster.getLeader(), 0);
- sleep(1000);
+
assertEquals(6, cluster.getFsms().size());
+
+ cluster.ensureSame(addr -> addr.equals(peer3.getEndpoint()));
+
for (MockStateMachine fsm : cluster.getFsms()) {
if (fsm.getAddress().equals(peer3.getEndpoint()))
assertEquals(20, fsm.getLogs().size());
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
index 159bba9..5ac11c7 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
@@ -102,7 +102,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
- * Integration tests for raft cluster.
+ * Integration tests for raft cluster. TODO asch get rid of sleeps wherether possible IGNITE-14832
*/
public class ITNodeTest {
private static final Logger LOG = LoggerFactory.getLogger(ITNodeTest.class);
@@ -177,7 +177,8 @@ public class ITNodeTest {
services.forEach(service -> {
try {
service.shutdown();
- } catch (Exception e) {
+ }
+ catch (Exception e) {
LOG.error("Error while closing a service", e);
}
});
@@ -204,7 +205,7 @@ public class ITNodeTest {
RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions);
- service.start(true);
+ service.start();
}
@Test
@@ -225,7 +226,7 @@ public class ITNodeTest {
RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions);
- Node node = service.start(true);
+ final Node node = service.start();
assertEquals(1, node.listPeers().size());
assertTrue(node.listPeers().contains(peer));
@@ -303,7 +304,7 @@ public class ITNodeTest {
RaftGroupService service = createService("unittest", peer, nodeOptions);
- Node node = service.start(true);
+ final Node node = service.start();
assertEquals(1, node.listPeers().size());
assertTrue(node.listPeers().contains(peer));
@@ -350,8 +351,8 @@ public class ITNodeTest {
applyLatch.countDown();
// The state machine is in error state, the node should step down.
- while (node.isLeader())
- Thread.sleep(10);
+ waitForCondition(() -> !node.isLeader(), 5_000);
+
latch.await();
applyCompleteLatch.await();
}
@@ -416,51 +417,6 @@ public class ITNodeTest {
waitLatch(latch);
}
- private void sendTestTaskAndWait(Node node) throws InterruptedException {
- sendTestTaskAndWait(node, 0, 10, RaftError.SUCCESS);
- }
-
- private void sendTestTaskAndWait(Node node, int amount) throws InterruptedException {
- sendTestTaskAndWait(node, 0, amount, RaftError.SUCCESS);
- }
-
- private void sendTestTaskAndWait(Node node, RaftError err) throws InterruptedException {
- sendTestTaskAndWait(node, 0, 10, err);
- }
-
- private void sendTestTaskAndWait(Node node, int start, int amount,
- RaftError err) throws InterruptedException {
- CountDownLatch latch = new CountDownLatch(amount);
- for (int i = start; i < start + amount; i++) {
- ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
- Task task = new Task(data, new ExpectClosure(err, latch));
- node.apply(task);
- }
- waitLatch(latch);
- }
-
- private void sendTestTaskAndWait(Node node, int start,
- RaftError err) throws InterruptedException {
- sendTestTaskAndWait(node, start, 10, err);
- }
-
- @SuppressWarnings("SameParameterValue")
- private void sendTestTaskAndWait(String prefix, Node node, int code) throws InterruptedException {
- sendTestTaskAndWait(prefix, node, 10, code);
- }
-
- @SuppressWarnings("SameParameterValue")
- private void sendTestTaskAndWait(String prefix, Node node, int amount,
- int code) throws InterruptedException {
- CountDownLatch latch = new CountDownLatch(10);
- for (int i = 0; i < amount; i++) {
- ByteBuffer data = ByteBuffer.wrap((prefix + i).getBytes());
- Task task = new Task(data, new ExpectClosure(code, null, latch));
- node.apply(task);
- }
- waitLatch(latch);
- }
-
@Test
public void testTripleNodesWithReplicatorStateListener() throws Exception {
List<PeerId> peers = TestUtils.generatePeers(3);
@@ -477,6 +433,7 @@ public class ITNodeTest {
// elect leader
cluster.waitLeader();
+ cluster.ensureLeader(cluster.getLeader());
for (Node follower : cluster.getFollowers())
waitForCondition(() -> follower.getLeaderId() != null, 5_000);
@@ -509,6 +466,8 @@ public class ITNodeTest {
// get leader
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
+
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
sendTestTaskAndWait(leader);
@@ -571,8 +530,9 @@ public class ITNodeTest {
assertTrue(cluster.start(peer.getEndpoint()));
cluster.waitLeader();
-
Node leader = cluster.getLeader();
+ cluster.ensureLeader(leader);
+
sendTestTaskAndWait(leader);
Thread.sleep(100);
List<Node> followers = cluster.getFollowers();
@@ -608,6 +568,8 @@ public class ITNodeTest {
// get leader
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
+
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
sendTestTaskAndWait(leader);
@@ -671,7 +633,7 @@ public class ITNodeTest {
.singletonList(learnerPeer)));
learnerServer = createService("unittest", new PeerId(learnerAddr, 0), nodeOptions);
- learnerServer.start(true);
+ learnerServer.start();
}
{
@@ -686,7 +648,7 @@ public class ITNodeTest {
.singletonList(learnerPeer)));
RaftGroupService server = createService("unittest", new PeerId(addr, 0), nodeOptions);
- Node node = server.start(true);
+ Node node = server.start();
assertEquals(1, node.listPeers().size());
assertTrue(node.listPeers().contains(peer));
@@ -730,12 +692,12 @@ public class ITNodeTest {
cluster.waitLeader();
Node leader = cluster.getLeader();
+ cluster.ensureLeader(leader);
waitForCondition(() -> leader.listAlivePeers().size() == 3, 5_000);
waitForCondition(() -> leader.listAliveLearners().size() == 3, 5_000);
sendTestTaskAndWait(leader);
- Thread.sleep(500);
List<MockStateMachine> fsms = cluster.getFsms();
assertEquals(6, fsms.size());
cluster.ensureSame();
@@ -799,6 +761,7 @@ public class ITNodeTest {
// elect leader
cluster.waitLeader();
Node leader = cluster.getLeader();
+ cluster.ensureLeader(leader);
assertEquals(3, leader.listPeers().size());
assertEquals(1, leader.listLearners().size());
@@ -835,6 +798,8 @@ public class ITNodeTest {
// get leader
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
+
assertEquals(3, leader.listPeers().size());
assertTrue(leader.listLearners().isEmpty());
assertTrue(leader.listAliveLearners().isEmpty());
@@ -901,6 +866,7 @@ public class ITNodeTest {
assertTrue(done.await().isOk());
assertEquals(2, leader.listAliveLearners().size());
assertEquals(2, leader.listLearners().size());
+ cluster.ensureSame();
}
{
// stop two followers
@@ -915,13 +881,11 @@ public class ITNodeTest {
assertEquals(RaftError.EPERM, done.getStatus().getRaftError());
// One peer with two learners.
assertEquals(3, cluster.getFsms().size());
- cluster.ensureSame();
}
}
@Test
public void testNodesWithPriorityElection() throws Exception {
-
List<Integer> priorities = new ArrayList<>();
priorities.add(100);
priorities.add(40);
@@ -939,6 +903,8 @@ public class ITNodeTest {
// get leader
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
+
assertEquals(3, leader.listPeers().size());
assertEquals(100, leader.getNodeTargetPriority());
assertEquals(100, leader.getLeaderId().getPriority());
@@ -947,7 +913,6 @@ public class ITNodeTest {
@Test
public void testNodesWithPartPriorityElection() throws Exception {
-
List<Integer> priorities = new ArrayList<>();
priorities.add(100);
priorities.add(40);
@@ -965,6 +930,8 @@ public class ITNodeTest {
// get leader
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
+
assertEquals(3, leader.listPeers().size());
assertEquals(2, cluster.getFollowers().size());
}
@@ -989,6 +956,8 @@ public class ITNodeTest {
// get leader
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
+
assertEquals(3, leader.listPeers().size());
assertEquals(2, cluster.getFollowers().size());
}
@@ -1013,6 +982,8 @@ public class ITNodeTest {
// get leader
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
+
assertEquals(3, leader.listPeers().size());
assertEquals(2, cluster.getFollowers().size());
assertEquals(50, leader.getNodeTargetPriority());
@@ -1056,6 +1027,7 @@ public class ITNodeTest {
cluster.waitLeader();
Node leader = cluster.getLeader();
+ cluster.ensureLeader(leader);
assertNotNull(leader);
assertEquals(100, leader.getNodeId().getPeerId().getPriority());
@@ -1110,6 +1082,8 @@ public class ITNodeTest {
// get leader
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
+
assertEquals(100, leader.getNodeTargetPriority());
assertEquals(100, leader.getNodeId().getPeerId().getPriority());
@@ -1249,6 +1223,7 @@ public class ITNodeTest {
// get leader
Node leader = cluster.getLeader();
assertNotNull(leader);
+
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
sendTestTaskAndWait(leader);
@@ -1294,6 +1269,7 @@ public class ITNodeTest {
// get leader
Node leader = cluster.getLeader();
assertNotNull(leader);
+
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
sendTestTaskAndWait(leader);
@@ -1440,31 +1416,6 @@ public class ITNodeTest {
assertEquals(10000, fsm.getLogs().size());
}
- @SuppressWarnings({"unused", "SameParameterValue"})
- private boolean assertReadIndex(Node node, int index) throws InterruptedException {
- CountDownLatch latch = new CountDownLatch(1);
- byte[] requestContext = TestUtils.getRandomBytes();
- AtomicBoolean success = new AtomicBoolean(false);
- node.readIndex(requestContext, new ReadIndexClosure() {
-
- @Override
- public void run(Status status, long theIndex, byte[] reqCtx) {
- if (status.isOk()) {
- assertEquals(index, theIndex);
- assertArrayEquals(requestContext, reqCtx);
- success.set(true);
- }
- else {
- assertTrue(status.getErrorMsg().contains("RPC exception:Check connection["), status.getErrorMsg());
- assertTrue(status.getErrorMsg().contains("] fail and try to create new one"), status.getErrorMsg());
- }
- latch.countDown();
- }
- });
- latch.await();
- return success.get();
- }
-
@Test
public void testNodeMetrics() throws Exception {
List<PeerId> peers = TestUtils.generatePeers(3);
@@ -1661,10 +1612,6 @@ public class ITNodeTest {
assertEquals(10, fsm.getLogs().size());
}
- private void waitLatch(CountDownLatch latch) throws InterruptedException {
- assertTrue(latch.await(30, TimeUnit.SECONDS));
- }
-
@Test
public void testRemoveFollower() throws Exception {
List<PeerId> peers = TestUtils.generatePeers(3);
@@ -1679,6 +1626,8 @@ public class ITNodeTest {
// get leader
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
+
// apply tasks to leader
sendTestTaskAndWait(leader);
@@ -1739,6 +1688,8 @@ public class ITNodeTest {
// get leader
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
+
// apply tasks to leader
sendTestTaskAndWait(leader);
@@ -1755,7 +1706,6 @@ public class ITNodeTest {
CountDownLatch latch = new CountDownLatch(1);
leader.removePeer(oldLeader, new ExpectClosure(latch));
waitLatch(latch);
- Thread.sleep(100);
// elect new leader
cluster.waitLeader();
@@ -1916,9 +1866,6 @@ public class ITNodeTest {
LOG.info("start follower {}", followerAddr2);
assertTrue(cluster.start(followerAddr2, true, 300));
- // Make sure the leader has discovered new nodes.
- assertTrue(waitForTopology(cluster, leaderAddr, 3, 30_000)); // Discovery may take a while sometimes.
-
CountDownLatch latch = new CountDownLatch(1);
LOG.info("Add old follower {}", followerAddr1);
leader.addPeer(followerPeer1, new ExpectClosure(latch));
@@ -1942,7 +1889,7 @@ public class ITNodeTest {
* @throws Exception
*/
@Test
- public void testRestoreSnasphot() throws Exception {
+ public void testRestoreSnapshot() throws Exception {
List<PeerId> peers = TestUtils.generatePeers(3);
cluster = new TestCluster("unitest", dataPath, peers);
@@ -2021,22 +1968,6 @@ public class ITNodeTest {
assertEquals(1, fsm.getLoadSnapshotTimes());
}
- private void triggerLeaderSnapshot(TestCluster cluster, Node leader) throws InterruptedException {
- triggerLeaderSnapshot(cluster, leader, 1);
- }
-
- private void triggerLeaderSnapshot(TestCluster cluster, Node leader, int times)
- throws InterruptedException {
- // trigger leader snapshot
- // first snapshot will be triggered randomly
- int snapshotTimes = cluster.getLeaderFsm().getSaveSnapshotTimes();
- assertTrue(snapshotTimes == times - 1 || snapshotTimes == times, "snapshotTimes=" + snapshotTimes + ", times=" + times);
- CountDownLatch latch = new CountDownLatch(1);
- leader.snapshot(new ExpectClosure(latch));
- waitLatch(latch);
- assertEquals(snapshotTimes + 1, cluster.getLeaderFsm().getSaveSnapshotTimes());
- }
-
@Test
public void testInstallSnapshotWithThrottle() throws Exception {
List<PeerId> peers = TestUtils.generatePeers(3);
@@ -2050,6 +1981,8 @@ public class ITNodeTest {
// get leader
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
+
// apply tasks to leader
sendTestTaskAndWait(leader);
@@ -2092,6 +2025,7 @@ public class ITNodeTest {
}
@Test // TODO add test for timeout on snapshot install https://issues.apache.org/jira/browse/IGNITE-14832
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-14943")
public void testInstallLargeSnapshotWithThrottle() throws Exception {
List<PeerId> peers = TestUtils.generatePeers(4);
cluster = new TestCluster("unitest", dataPath, peers.subList(0, 3));
@@ -2104,6 +2038,8 @@ public class ITNodeTest {
// get leader
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
+
// apply tasks to leader
sendTestTaskAndWait(leader, 0, RaftError.SUCCESS);
@@ -2160,6 +2096,8 @@ public class ITNodeTest {
// get leader
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
+
// apply tasks to leader
sendTestTaskAndWait(leader, 0, RaftError.SUCCESS);
@@ -2205,7 +2143,6 @@ public class ITNodeTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-14853")
public void testInstallSnapshot() throws Exception {
List<PeerId> peers = TestUtils.generatePeers(3);
@@ -2218,6 +2155,8 @@ public class ITNodeTest {
// get leader
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
+
// apply tasks to leader
sendTestTaskAndWait(leader);
@@ -2342,13 +2281,11 @@ public class ITNodeTest {
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
- // Ensure the quorum before stopping a follower, otherwise leader can step down.
- assertTrue(waitForCondition(() -> followers.get(1).getLeaderId() != null, 5_000));
-
Endpoint followerAddr = followers.get(0).getNodeId().getPeerId().getEndpoint().copy();
assertTrue(cluster.stop(followerAddr));
@@ -2392,7 +2329,6 @@ public class ITNodeTest {
PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy();
LOG.info("Transfer leadership from {} to {}", leader, targetPeer);
assertTrue(leader.transferLeadershipTo(targetPeer).isOk());
- Thread.sleep(1000);
cluster.waitLeader();
leader = cluster.getLeader();
assertEquals(leader.getNodeId().getPeerId(), targetPeer);
@@ -2411,13 +2347,11 @@ public class ITNodeTest {
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
- // Ensure the quorum before stopping a follower, otherwise leader can step down.
- assertTrue(waitForCondition(() -> followers.get(1).getLeaderId() != null, 5_000));
-
PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy();
assertTrue(cluster.stop(targetPeer.getEndpoint()));
@@ -2453,6 +2387,7 @@ public class ITNodeTest {
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
@@ -2521,7 +2456,7 @@ public class ITNodeTest {
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(new PeerId(addr, 0))));
RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions);
- Node node = service.start(true);
+ Node node = service.start();
Thread.sleep(1000);
sendTestTaskAndWait(node);
@@ -2544,7 +2479,7 @@ public class ITNodeTest {
RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions);
try {
- service.start(true);
+ service.start();
fail();
}
@@ -2635,7 +2570,7 @@ public class ITNodeTest {
PeerId follower = followers.get(0).getNodeId().getPeerId();
assertTrue(leader.transferLeadershipTo(follower).isOk());
- Thread.sleep(2000);
+ cluster.waitLeader();
leader = cluster.getLeader();
assertEquals(follower, leader.getNodeId().getPeerId());
@@ -2722,6 +2657,8 @@ public class ITNodeTest {
cluster.waitLeader();
Node firstLeader = cluster.getLeader();
assertNotNull(firstLeader);
+ cluster.ensureLeader(firstLeader);
+
// apply something
sendTestTaskAndWait(firstLeader);
@@ -2788,6 +2725,8 @@ public class ITNodeTest {
Node leader = cluster.getLeader();
assertNotNull(leader);
+ cluster.ensureLeader(leader);
+
sendTestTaskAndWait(leader);
// index == 1 is a CONFIGURATION log, so real_index will be 2 when returned.
@@ -2903,7 +2842,7 @@ public class ITNodeTest {
RaftGroupService service = createService("test", new PeerId(addr, 0), nodeOpts);
- Node node = service.start(true);
+ Node node = service.start();
assertEquals(26, fsm.getLogs().size());
for (int i = 0; i < 26; i++)
@@ -2941,7 +2880,7 @@ public class ITNodeTest {
RaftGroupService service = createService("test", new PeerId(addr, 0), nodeOpts);
- Node node = service.start(true);
+ Node node = service.start();
while (!node.isLeader())
Thread.sleep(20);
sendTestTaskAndWait(node);
@@ -2949,7 +2888,6 @@ public class ITNodeTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-14852")
public void testChangePeers() throws Exception {
PeerId peer0 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
cluster = new TestCluster("testChangePeers", dataPath, Collections.singletonList(peer0));
@@ -2975,7 +2913,12 @@ public class ITNodeTest {
Status status = done.await();
assertTrue(status.isOk(), status.getRaftError().toString());
}
- cluster.ensureSame();
+
+ cluster.waitLeader();
+
+ for (final MockStateMachine fsm : cluster.getFsms()) {
+ assertEquals(10, fsm.getLogs().size());
+ }
}
@Test
@@ -3349,7 +3292,7 @@ public class ITNodeTest {
assertNull(cluster.getLeader());
- Thread.sleep(3000);
+ Thread.sleep(2000);
assertNull(cluster.getLeader());
@@ -3376,7 +3319,13 @@ public class ITNodeTest {
}
/**
- * {@inheritDoc}
+ * TODO asch get rid of waiting for topology IGNITE-14832
+ *
+ * @param cluster
+ * @param addr
+ * @param expected
+ * @param timeout
+ * @return
*/
private boolean waitForTopology(TestCluster cluster, Endpoint addr, int expected, long timeout) {
RaftGroupService grp = cluster.getServer(addr);
@@ -3412,7 +3361,9 @@ public class ITNodeTest {
}
/**
- * {@inheritDoc}
+ * @param cond The condition.
+ * @param timeout The timeout.
+ * @return {@code True} if the condition is satisfied.
*/
private boolean waitForCondition(BooleanSupplier cond, long timeout) {
long stop = System.currentTimeMillis() + timeout;
@@ -3453,7 +3404,7 @@ public class ITNodeTest {
ClusterService clusterService = createClusterService(peerId.getEndpoint(), servers);
- IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, servers, nodeManager);
+ IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, servers, nodeManager, nodeOptions);
nodeOptions.setRpcClient(new IgniteRpcClient(clusterService));
@@ -3484,4 +3435,94 @@ public class ITNodeTest {
return clusterServiceFactory.createClusterService(clusterConfig);
}
+
+ private void sendTestTaskAndWait(final Node node) throws InterruptedException {
+ this.sendTestTaskAndWait(node, 0, 10, RaftError.SUCCESS);
+ }
+
+ private void sendTestTaskAndWait(final Node node, int amount) throws InterruptedException {
+ this.sendTestTaskAndWait(node, 0, amount, RaftError.SUCCESS);
+ }
+
+ private void sendTestTaskAndWait(final Node node, final RaftError err) throws InterruptedException {
+ this.sendTestTaskAndWait(node, 0, 10, err);
+ }
+
+ private void sendTestTaskAndWait(final Node node, final int start, int amount,
+ final RaftError err) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(amount);
+ for (int i = start; i < start + amount; i++) {
+ final ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
+ final Task task = new Task(data, new ExpectClosure(err, latch));
+ node.apply(task);
+ }
+ waitLatch(latch);
+ }
+
+ private void sendTestTaskAndWait(final Node node, final int start,
+ final RaftError err) throws InterruptedException {
+ sendTestTaskAndWait(node, start, 10, err);
+ }
+
+ @SuppressWarnings("SameParameterValue")
+ private void sendTestTaskAndWait(final String prefix, final Node node, final int code) throws InterruptedException {
+ sendTestTaskAndWait(prefix, node, 10, code);
+ }
+
+ @SuppressWarnings("SameParameterValue")
+ private void sendTestTaskAndWait(final String prefix, final Node node, int amount,
+ final int code) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(10);
+ for (int i = 0; i < amount; i++) {
+ final ByteBuffer data = ByteBuffer.wrap((prefix + i).getBytes());
+ final Task task = new Task(data, new ExpectClosure(code, null, latch));
+ node.apply(task);
+ }
+ waitLatch(latch);
+ }
+
+ private void triggerLeaderSnapshot(TestCluster cluster, Node leader) throws InterruptedException {
+ triggerLeaderSnapshot(cluster, leader, 1);
+ }
+
+ private void triggerLeaderSnapshot(TestCluster cluster, Node leader, int times)
+ throws InterruptedException {
+ // trigger leader snapshot
+ // first snapshot will be triggered randomly
+ int snapshotTimes = cluster.getLeaderFsm().getSaveSnapshotTimes();
+ assertTrue(snapshotTimes == times - 1 || snapshotTimes == times, "snapshotTimes=" + snapshotTimes + ", times=" + times);
+ CountDownLatch latch = new CountDownLatch(1);
+ leader.snapshot(new ExpectClosure(latch));
+ waitLatch(latch);
+ assertEquals(snapshotTimes + 1, cluster.getLeaderFsm().getSaveSnapshotTimes());
+ }
+
+ private void waitLatch(final CountDownLatch latch) throws InterruptedException {
+ assertTrue(latch.await(30, TimeUnit.SECONDS));
+ }
+
+ @SuppressWarnings({"unused", "SameParameterValue"})
+ private boolean assertReadIndex(Node node, int index) throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+ byte[] requestContext = TestUtils.getRandomBytes();
+ AtomicBoolean success = new AtomicBoolean(false);
+ node.readIndex(requestContext, new ReadIndexClosure() {
+
+ @Override
+ public void run(Status status, long theIndex, byte[] reqCtx) {
+ if (status.isOk()) {
+ assertEquals(index, theIndex);
+ assertArrayEquals(requestContext, reqCtx);
+ success.set(true);
+ }
+ else {
+ assertTrue(status.getErrorMsg().contains("RPC exception:Check connection["), status.getErrorMsg());
+ assertTrue(status.getErrorMsg().contains("] fail and try to create new one"), status.getErrorMsg());
+ }
+ latch.countDown();
+ }
+ });
+ latch.await();
+ return success.get();
+ }
}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
index c07a815..287a1d7 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Utils;
import org.junit.jupiter.api.AfterEach;
@@ -43,6 +44,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
+import static org.apache.ignite.raft.jraft.core.State.STATE_ERROR;
+import static org.apache.ignite.raft.jraft.core.State.STATE_LEADER;
import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
@@ -160,7 +163,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
servers.add(server);
- assertTrue(waitForTopology(service, servers.size(), 5_000));
+ assertTrue(waitForTopology(service, servers.size(), 15_000));
return server;
}
@@ -386,6 +389,11 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
client1.refreshLeader().get();
client2.refreshLeader().get();
+ NodeImpl leader = servers.stream().map(s -> ((NodeImpl) s.raftGroupService(COUNTER_GROUP_0).getRaftNode())).
+ filter(n -> n.getState() == STATE_LEADER).findFirst().orElse(null);
+
+ assertNotNull(leader);
+
long val1 = applyIncrements(client1, 1, 5);
long val2 = applyIncrements(client2, 1, 7);
@@ -407,11 +415,20 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
assertTrue(cause instanceof RaftException);
}
+ NodeImpl finalLeader = leader;
+ waitForCondition(() -> finalLeader.getState() == STATE_ERROR, 5_000);
+
+ // Client can't switch to new leader, because only one peer in the list.
try {
client1.<Long>run(new IncrementAndGetCommand(11)).get();
}
catch (Exception e) {
- assertTrue(e.getCause() instanceof TimeoutException, "New leader should not get elected");
+ boolean isValid = e.getCause() instanceof TimeoutException;
+
+ if (!isValid)
+ LOG.error("Got unexpected exception", e);
+
+ assertTrue(isValid, "Expecting the timeout");
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
index e29af9b..db9a3e8 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
@@ -173,7 +173,7 @@ public class JRaftServerImpl implements RaftServer {
final RaftGroupService server = new RaftGroupService(groupId, new PeerId(endpoint, 0,
ElectionPriority.DISABLED), nodeOptions, rpcServer, nodeManager, true);
- server.start(false);
+ server.start();
groups.put(groupId, server);
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
index 1139159..08bcb3e 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.Peer;
@@ -90,11 +89,11 @@ public class RaftServerImpl implements RaftServer {
readQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
writeQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
- service.messagingService().addMessageHandler((message, sender, correlationId) -> {
+ service.messagingService().addMessageHandler((message, senderAddr, correlationId) -> {
if (message instanceof GetLeaderRequest) {
GetLeaderResponse resp = clientMsgFactory.getLeaderResponse().leader(new Peer(service.topologyService().localMember().address())).build();
- service.messagingService().send(sender, resp, correlationId);
+ service.messagingService().send(senderAddr, resp, correlationId);
}
else if (message instanceof ActionRequest) {
ActionRequest req0 = (ActionRequest)message;
@@ -102,15 +101,15 @@ public class RaftServerImpl implements RaftServer {
RaftGroupListener lsnr = listeners.get(req0.groupId());
if (lsnr == null) {
- sendError(sender, correlationId, RaftErrorCode.ILLEGAL_STATE);
+ sendError(senderAddr, correlationId, RaftErrorCode.ILLEGAL_STATE);
return;
}
if (req0.command() instanceof ReadCommand)
- handleActionRequest(sender, req0, correlationId, readQueue, lsnr);
+ handleActionRequest(senderAddr, req0, correlationId, readQueue, lsnr);
else
- handleActionRequest(sender, req0, correlationId, writeQueue, lsnr);
+ handleActionRequest(senderAddr, req0, correlationId, writeQueue, lsnr);
}
// TODO https://issues.apache.org/jira/browse/IGNITE-14775
});
@@ -172,7 +171,7 @@ public class RaftServerImpl implements RaftServer {
* @param <T> Command type.
*/
private <T extends Command> void handleActionRequest(
- ClusterNode sender,
+ String sender,
ActionRequest req,
String corellationId,
BlockingQueue<CommandClosureEx<T>> queue,
@@ -223,7 +222,7 @@ public class RaftServerImpl implements RaftServer {
}
}
- private void sendError(ClusterNode sender, String corellationId, RaftErrorCode errorCode) {
+ private void sendError(String sender, String corellationId, RaftErrorCode errorCode) {
RaftErrorResponse resp = clientMsgFactory.raftErrorResponse().errorCode(errorCode).build();
service.messagingService().send(sender, resp, corellationId);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
index 846860e..e3c06de 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
@@ -110,15 +110,6 @@ public class RaftGroupService {
* Starts the raft group service, returns the raft node.
*/
public synchronized Node start() {
- return start(true);
- }
-
- /**
- * Starts the raft group service, returns the raft node.
- *
- * @param startRpcServer whether to start RPC server.
- */
- public synchronized Node start(final boolean startRpcServer) {
if (this.started) {
return this.node;
}
@@ -132,6 +123,14 @@ public class RaftGroupService {
assert this.nodeOptions.getRpcClient() != null;
+ // Should start RPC server before node initialization to avoid race.
+ if (!sharedRpcServer) {
+ this.rpcServer.init(null);
+ }
+ else {
+ LOG.info("RPC server is shared by RaftGroupService.");
+ }
+
this.node = new NodeImpl(groupId, serverId);
if (!this.node.init(this.nodeOptions)) {
@@ -148,13 +147,6 @@ public class RaftGroupService {
throw new IgniteInternalException("Fail to init node, please see the logs to find the reason.");
}
- if (startRpcServer) {
- this.rpcServer.init(null);
- }
- else {
- LOG.warn("RPC server is not started in RaftGroupService.");
- }
-
this.nodeManager.add(this.node);
this.started = true;
LOG.info("Start the RaftGroupService successfully {}", this.node.getNodeId());
@@ -163,14 +155,9 @@ public class RaftGroupService {
public synchronized void shutdown() {
// TODO asch remove handlers before shutting down raft node https://issues.apache.org/jira/browse/IGNITE-14519
- if (!this.started) {
- return;
- }
- if (this.rpcServer != null) {
+ if (this.rpcServer != null && !this.sharedRpcServer) {
try {
- if (!this.sharedRpcServer) {
- this.rpcServer.shutdown();
- }
+ this.rpcServer.shutdown();
}
catch (Exception e) {
LOG.error("Failed to shutdown the server", e);
@@ -178,6 +165,10 @@ public class RaftGroupService {
this.rpcServer = null;
}
+ if (!this.started) {
+ return;
+ }
+
this.node.shutdown();
try {
this.node.join();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 5855826..38ff515 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -1930,8 +1930,8 @@ public class NodeImpl implements Node, RaftServerService {
if (localPrevLogTerm != prevLogTerm) {
final long lastLogIndex = this.logManager.getLastLogIndex();
- LOG.warn(
- "Node {} reject term_unmatched AppendEntriesRequest from {}, term={}, prevLogIndex={}, prevLogTerm={}, localPrevLogTerm={}, lastLogIndex={}, entriesSize={}.",
+ LOG.warn("Node {} reject term_unmatched AppendEntriesRequest from {}, term={}, prevLogIndex={}, " +
+ "prevLogTerm={}, localPrevLogTerm={}, lastLogIndex={}, entriesSize={}.",
getNodeId(), request.getServerId(), request.getTerm(), prevLogIndex, prevLogTerm, localPrevLogTerm,
lastLogIndex, entriesCount);
@@ -3508,6 +3508,13 @@ public class NodeImpl implements Node, RaftServerService {
}
}
+ /**
+ * @return The state.
+ */
+ public State getState() {
+ return state;
+ }
+
@Override
public String toString() {
return "JRaftNode [nodeId=" + getNodeId() + "]";
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RemotingException.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RemotingException.java
index a5861ea..0901225 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RemotingException.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RemotingException.java
@@ -20,7 +20,6 @@ package org.apache.ignite.raft.jraft.error;
* Exception for default remoting problems.
*/
public class RemotingException extends Exception {
-
private static final long serialVersionUID = -6326244159775972292L;
public RemotingException() {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
index 5e473c8..6251d8b 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
@@ -16,6 +16,7 @@
*/
package org.apache.ignite.raft.jraft.rpc;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.network.TopologyEventHandler;
import org.apache.ignite.raft.jraft.Lifecycle;
import org.apache.ignite.raft.jraft.error.RemotingException;
@@ -28,10 +29,11 @@ import org.jetbrains.annotations.Nullable;
*/
public interface RpcClient extends Lifecycle<RpcOptions> {
/**
- * Check connection for given address. // TODO asch rename to isAlive.
+ * Check connection for given address.
*
* @param endpoint target address
* @return true if there is a connection and the connection is active and writable.
+ * @deprecated // TODO asch remove IGNITE-14832
*/
boolean checkConnection(Endpoint endpoint);
@@ -43,44 +45,6 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
void registerConnectEventListener(TopologyEventHandler handler);
/**
- * Synchronous invocation.
- *
- * @param endpoint target address
- * @param request request object
- * @param timeoutMs timeout millisecond
- * @return invoke result
- */
- default Object invokeSync(Endpoint endpoint, Object request, long timeoutMs)
- throws InterruptedException, RemotingException {
- return invokeSync(endpoint, request, null, timeoutMs);
- }
-
- /**
- * Synchronous invocation using a invoke context.
- *
- * @param endpoint target address
- * @param request request object
- * @param ctx invoke context
- * @param timeoutMs timeout millisecond
- * @return invoke result
- */
- Object invokeSync(Endpoint endpoint, Object request, @Nullable InvokeContext ctx,
- long timeoutMs) throws InterruptedException, RemotingException;
-
- /**
- * Asynchronous invocation with a callback.
- *
- * @param endpoint target address
- * @param request request object
- * @param callback invoke callback
- * @param timeoutMs timeout millisecond
- */
- default void invokeAsync(Endpoint endpoint, Object request, InvokeCallback callback,
- long timeoutMs) throws InterruptedException, RemotingException {
- invokeAsync(endpoint, request, null, callback, timeoutMs);
- }
-
- /**
* Asynchronous invocation with a callback.
*
* @param endpoint target address
@@ -88,8 +52,14 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
* @param ctx invoke context
* @param callback invoke callback
* @param timeoutMs timeout millisecond
+ *
+ * @return The future.
*/
- void invokeAsync(Endpoint endpoint, Object request, @Nullable InvokeContext ctx,
+ CompletableFuture<Message> invokeAsync(
+ Endpoint endpoint,
+ Object request,
+ @Nullable InvokeContext ctx,
InvokeCallback callback,
- long timeoutMs) throws InterruptedException, RemotingException;
+ long timeoutMs
+ ) throws InterruptedException, RemotingException;
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestClosure.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestClosure.java
index 5199d8c..809423e 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestClosure.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestClosure.java
@@ -58,6 +58,7 @@ public class RpcRequestClosure implements Closure {
LOG.warn("A response: {} sent repeatedly!", msg);
return;
}
+
this.rpcCtx.sendResponse(msg);
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestProcessor.java
index 112ab25..68d2d6f 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestProcessor.java
@@ -46,6 +46,7 @@ public abstract class RpcRequestProcessor<T extends Message> implements RpcProce
public void handleRequest(final RpcContext rpcCtx, final T request) {
try {
final Message msg = processRequest(request, new RpcRequestClosure(rpcCtx, this.defaultResp));
+
if (msg != null) {
rpcCtx.sendResponse(msg);
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcResponseClosureAdapter.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcResponseClosureAdapter.java
index 086c9cd..da6581a 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcResponseClosureAdapter.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcResponseClosureAdapter.java
@@ -19,13 +19,9 @@ package org.apache.ignite.raft.jraft.rpc;
/**
* RpcResponseClosure adapter holds the response.
*
- *
- * 2018-Mar-29 2:30:35 PM
- *
* @param <T>
*/
public abstract class RpcResponseClosureAdapter<T extends Message> implements RpcResponseClosure<T> {
-
private T resp;
public T getResponse() {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
index 727f886..21dc6c8 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
@@ -18,6 +18,7 @@ package org.apache.ignite.raft.jraft.rpc.impl;
import java.net.ConnectException;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -39,6 +40,7 @@ import org.apache.ignite.raft.jraft.rpc.RpcResponseClosure;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.Utils;
import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.raft.jraft.util.internal.ThrowUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +57,7 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
/**
* The set of pinged addresses
*/
- private Set<String> readyAddresses = new ConcurrentHashSet<>();
+ protected Set<String> readyAddresses = new ConcurrentHashSet<>();
public RpcClient getRpcClient() {
return this.rpcClient;
@@ -120,24 +122,28 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
return true;
try {
- final RpcRequests.PingRequest req = RpcRequests.PingRequest.newBuilder() //
- .setSendTimestamp(System.currentTimeMillis()) //
+ final RpcRequests.PingRequest req = RpcRequests.PingRequest.newBuilder()
+ .setSendTimestamp(System.currentTimeMillis())
.build();
- final ErrorResponse resp = (ErrorResponse) rc.invokeSync(endpoint, req,
- this.rpcOptions.getRpcConnectTimeoutMs());
+ Future<Message> fut =
+ invokeWithDone(endpoint, req, null, null, rpcOptions.getRpcConnectTimeoutMs(), rpcExecutor);
- if (resp.getErrorCode() == 0) {
+ final ErrorResponse resp = (ErrorResponse) fut.get(); // Future will be certainly terminated by timeout.
+
+ if (resp != null && resp.getErrorCode() == 0) {
readyAddresses.add(endpoint.toString());
return true;
}
+ else
+ return false;
}
catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
- catch (final RemotingException e) {
- LOG.error("Fail to connect {}, remoting exception: {}.", endpoint, e.getMessage());
+ catch (final ExecutionException e) {
+ LOG.error("Fail to connect {}, exception: {}.", endpoint, e.getMessage());
}
}
@@ -169,6 +175,7 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
final RpcClient rc = this.rpcClient;
final FutureImpl<Message> future = new FutureImpl<>();
final Executor currExecutor = rpcExecutor != null ? rpcExecutor : this.rpcExecutor;
+
try {
if (rc == null) {
// TODO asch replace with ignite exception, check all places IGNITE-14832
@@ -179,14 +186,9 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
return future;
}
- rc.invokeAsync(endpoint, request, ctx, new InvokeCallback() {
+ return rc.invokeAsync(endpoint, request, ctx, new InvokeCallback() {
@Override
public void complete(final Object result, final Throwable err) {
- if (future.isCancelled()) {
- onCanceled(request, done);
- return;
- }
-
if (err == null) {
Status status = Status.OK();
Message msg;
@@ -213,8 +215,8 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
}
}
else {
- if (err instanceof ConnectException)
- readyAddresses.remove(endpoint); // Force logical reconnect.
+ if (ThrowUtil.hasCause(err, null, ConnectException.class))
+ readyAddresses.remove(endpoint.toString()); // Force logical reconnect.
if (done != null) {
try {
@@ -249,7 +251,6 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
// should be in another thread to avoid dead locking.
Utils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTERNAL,
"Fail to send a RPC request:" + e.getMessage()));
-
}
return future;
@@ -261,16 +262,4 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
status.setErrorMsg(eResp.getErrorMsg());
return status;
}
-
- private <T extends Message> void onCanceled(final Message request, final RpcResponseClosure<T> done) {
- if (done != null) {
- try {
- done.run(new Status(RaftError.ECANCELED, "RPC request was canceled by future."));
- }
- catch (final Throwable t) {
- LOG.error("Fail to run RpcResponseClosure, the request is {}.", request, t);
- }
- }
- }
-
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
index 6d23508..1cd2909 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
@@ -33,6 +33,7 @@ import org.apache.ignite.raft.jraft.error.RemotingException;
import org.apache.ignite.raft.jraft.option.RpcOptions;
import org.apache.ignite.raft.jraft.rpc.InvokeCallback;
import org.apache.ignite.raft.jraft.rpc.InvokeContext;
+import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RpcClientEx;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.Utils;
@@ -68,52 +69,14 @@ public class IgniteRpcClient implements RpcClientEx {
}
/** {@inheritDoc} */
- @Override public Object invokeSync(Endpoint endpoint, Object request, InvokeContext ctx,
- long timeoutMs) throws InterruptedException, RemotingException {
- if (!checkConnection(endpoint))
- throw new RemotingException("Server is dead " + endpoint);
-
- CompletableFuture<Object> fut = new CompletableFuture();
-
- // Future hashcode used as corellation id.
- if (recordPred != null && recordPred.test(request, endpoint.toString()))
- recordedMsgs.add(new Object[] {request, endpoint.toString(), fut.hashCode(), System.currentTimeMillis(), null});
-
- boolean wasBlocked;
-
- synchronized (this) {
- wasBlocked = blockPred != null && blockPred.test(request, endpoint.toString());
-
- if (wasBlocked)
- blockedMsgs.add(new Object[] {request, endpoint.toString(), fut.hashCode(), System.currentTimeMillis(), (Runnable) () -> send(endpoint, request, fut, timeoutMs)});
- }
-
- if (!wasBlocked)
- send(endpoint, request, fut, timeoutMs);
-
- try {
- return fut.whenComplete((res, err) -> {
- assert !(res == null && err == null) : res + " " + err;
-
- if (err == null && recordPred != null && recordPred.test(res, this.toString()))
- recordedMsgs.add(new Object[] {res, this.toString(), fut.hashCode(), System.currentTimeMillis(), null});
- }).get(timeoutMs, TimeUnit.MILLISECONDS);
- }
- catch (ExecutionException e) {
- throw new RemotingException(e);
- }
- catch (TimeoutException e) {
- throw new InvokeTimeoutException();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void invokeAsync(Endpoint endpoint, Object request, InvokeContext ctx, InvokeCallback callback,
- long timeoutMs) throws InterruptedException, RemotingException {
- if (!checkConnection(endpoint))
- throw new RemotingException("Server is dead " + endpoint);
-
- CompletableFuture<Object> fut = new CompletableFuture<>();
+ @Override public CompletableFuture<Message> invokeAsync(
+ Endpoint endpoint,
+ Object request,
+ InvokeContext ctx,
+ InvokeCallback callback,
+ long timeoutMs
+ ) throws InterruptedException, RemotingException {
+ CompletableFuture<Message> fut = new CompletableFuture<>();
fut.orTimeout(timeoutMs, TimeUnit.MILLISECONDS).
whenComplete((res, err) -> {
@@ -146,14 +109,16 @@ public class IgniteRpcClient implements RpcClientEx {
}
}});
- return;
+ return fut;
}
}
send(endpoint, request, fut, timeoutMs);
+
+ return fut;
}
- public void send(Endpoint endpoint, Object request, CompletableFuture<Object> fut, long timeout) {
+ public void send(Endpoint endpoint, Object request, CompletableFuture<Message> fut, long timeout) {
CompletableFuture<NetworkMessage> fut0 = service.messagingService().invoke(endpoint.toString(), (NetworkMessage) request, timeout);
fut0.whenComplete(new BiConsumer<NetworkMessage, Throwable>() {
@@ -161,7 +126,7 @@ public class IgniteRpcClient implements RpcClientEx {
if (err != null)
fut.completeExceptionally(err);
else
- fut.complete(resp);
+ fut.complete((Message) resp);
}
});
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 31404a1..5492264 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -84,7 +84,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
registerProcessor(new GetFileRequestProcessor(rpcExecutor));
registerProcessor(new InstallSnapshotRequestProcessor(rpcExecutor));
registerProcessor(new RequestVoteRequestProcessor(rpcExecutor));
- registerProcessor(new PingRequestProcessor(rpcExecutor));
+ registerProcessor(new PingRequestProcessor(rpcExecutor)); // TODO asch this should go last.
registerProcessor(new TimeoutNowRequestProcessor(rpcExecutor));
registerProcessor(new ReadIndexRequestProcessor(rpcExecutor));
// raft native cli service
@@ -104,7 +104,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
registerProcessor(new ActionRequestProcessor(rpcExecutor, FACTORY));
registerProcessor(new org.apache.ignite.raft.jraft.rpc.impl.client.SnapshotRequestProcessor(rpcExecutor, FACTORY));
- service.messagingService().addMessageHandler((msg, sender, corellationId) -> {
+ service.messagingService().addMessageHandler((msg, senderAddr, corellationId) -> {
Class<? extends NetworkMessage> cls = msg.getClass();
RpcProcessor<NetworkMessage> prc = processors.get(cls.getName());
@@ -143,11 +143,11 @@ public class IgniteRpcServer implements RpcServer<Void> {
}
@Override public void sendResponse(Object responseObj) {
- service.messagingService().send(sender, (NetworkMessage) responseObj, corellationId);
+ service.messagingService().send(senderAddr, (NetworkMessage) responseObj, corellationId);
}
@Override public String getRemoteAddress() {
- return sender.address();
+ return senderAddr;
}
@Override public String getLocalAddress() {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
index add5d94..f0f88e7 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
@@ -394,7 +394,7 @@ public class AppendEntriesRequestProcessor extends NodeRequestProcessor<AppendEn
}
private int getAndIncrementSequence(final String groupId, final PeerPair pair, NodeManager nodeManager) {
- // TODO asch can use getPeerContext because it must already present (created before) ???
+ // TODO asch can use getPeerContext because it must already present (created before) ??? IGNITE-14832
return getOrCreatePeerRequestContext(groupId, pair, nodeManager).getAndIncrementSequence();
}
@@ -416,11 +416,14 @@ public class AppendEntriesRequestProcessor extends NodeRequestProcessor<AppendEn
boolean isHeartbeat = isHeartbeatRequest(request);
int reqSequence = -1;
+
if (!isHeartbeat) {
reqSequence = getAndIncrementSequence(groupId, pair, done.getRpcCtx().getNodeManager());
}
+
final Message response = service.handleAppendEntriesRequest(request, new SequenceRpcRequestClosure(done,
defaultResp(), groupId, pair, reqSequence, isHeartbeat));
+
if (response != null) {
// heartbeat or probe request
if (isHeartbeat) {
@@ -430,6 +433,7 @@ public class AppendEntriesRequestProcessor extends NodeRequestProcessor<AppendEn
sendSequenceResponse(groupId, pair, reqSequence, done.getRpcCtx(), response);
}
}
+
return null;
}
else {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/internal/ThrowUtil.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/internal/ThrowUtil.java
index ea599ac..6b1b4e0 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/internal/ThrowUtil.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/internal/ThrowUtil.java
@@ -16,24 +16,16 @@
*/
package org.apache.ignite.raft.jraft.util.internal;
+import org.jetbrains.annotations.Nullable;
+
/**
* Throwing tool.
*/
public final class ThrowUtil {
-
- private static final ReferenceFieldUpdater<Throwable, Throwable> causeUpdater = Updaters.newReferenceFieldUpdater(
- Throwable.class, "cause");
-
/**
* Raises an exception bypassing compiler checks for checked exceptions.
*/
public static void throwException(final Throwable t) {
-// if (UnsafeUtil.hasUnsafe()) {
-// UnsafeUtil.throwException(t);
-// } else {
-// ThrowUtil.throwException0(t);
-// }
-
ThrowUtil.throwException0(t);
}
@@ -46,17 +38,47 @@ public final class ThrowUtil {
throw (E) t;
}
- public static <T extends Throwable> T cutCause(final T cause) {
- Throwable rootCause = cause;
- while (rootCause.getCause() != null) {
- rootCause = rootCause.getCause();
- }
+ /**
+ * Checks if passed in {@code 'Throwable'} has given class in {@code 'cause'} hierarchy
+ * <b>including</b> that throwable itself.
+ * <p>
+ * Note that this method follows includes {@link Throwable#getSuppressed()}
+ * into check.
+ *
+ * @param t Throwable to check (if {@code null}, {@code false} is returned).
+ * @param msg Message text that should be in cause.
+ * @param cls Cause classes to check (if {@code null} or empty, {@code false} is returned).
+ * @return {@code True} if one of the causing exception is an instance of passed in classes,
+ * {@code false} otherwise.
+ */
+ public static boolean hasCause(@Nullable Throwable t, @Nullable String msg, Class<?> @Nullable... cls) {
+ if (t == null || cls == null || cls.length == 0)
+ return false;
+
+ for (Throwable th = t; th != null; th = th.getCause()) {
+ for (Class<?> c : cls) {
+ if (c.isAssignableFrom(th.getClass())) {
+ if (msg != null) {
+ if (th.getMessage() != null && th.getMessage().contains(msg))
+ return true;
+ else
+ continue;
+ }
+
+ return true;
+ }
+ }
- if (rootCause != cause) {
- cause.setStackTrace(rootCause.getStackTrace());
- causeUpdater.set(cause, cause);
+ for (Throwable n : th.getSuppressed()) {
+ if (hasCause(n, msg, cls))
+ return true;
+ }
+
+ if (th.getCause() == th)
+ break;
}
- return cause;
+
+ return false;
}
private ThrowUtil() {
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 92fe82a..6dd2f90 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterLocalConfiguration;
@@ -51,14 +52,20 @@ import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.Utils;
import org.jetbrains.annotations.Nullable;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
/**
* Test cluster for NodeTest
*/
public class TestCluster {
- /** Default election timeout. */
- private static final int ELECTION_TIMEOUT = 300;
+ /**
+ * Default election timeout.
+ * Important: due to sync disk ops (writing raft meta) during probe request processing this timeout should be high
+ * enough to avoid test flakiness.
+ */
+ private static final int ELECTION_TIMEOUT_MILLIS = 600;
private static final IgniteLogger LOG = IgniteLogger.forClass(TestCluster.class);
@@ -97,17 +104,17 @@ public class TestCluster {
}
public TestCluster(final String name, final String dataPath, final List<PeerId> peers) {
- this(name, dataPath, peers, ELECTION_TIMEOUT);
+ this(name, dataPath, peers, ELECTION_TIMEOUT_MILLIS);
}
public TestCluster(final String name, final String dataPath, final List<PeerId> peers,
final int electionTimeoutMs) {
- this(name, dataPath, peers, new LinkedHashSet<>(), ELECTION_TIMEOUT, null);
+ this(name, dataPath, peers, new LinkedHashSet<>(), ELECTION_TIMEOUT_MILLIS, null);
}
public TestCluster(final String name, final String dataPath, final List<PeerId> peers,
final LinkedHashSet<PeerId> learners, final int electionTimeoutMs) {
- this(name, dataPath, peers, learners, ELECTION_TIMEOUT, null);
+ this(name, dataPath, peers, learners, ELECTION_TIMEOUT_MILLIS, null);
}
public TestCluster(final String name, final String dataPath, final List<PeerId> peers,
@@ -210,7 +217,7 @@ public class TestCluster {
nodeOptions.setRpcClient(rpcClient);
- var rpcServer = new TestIgniteRpcServer(clusterService, servers, nodeManager);
+ var rpcServer = new TestIgniteRpcServer(clusterService, servers, nodeManager, nodeOptions);
clusterService.start();
@@ -306,7 +313,7 @@ public class TestCluster {
public void clean(final Endpoint listenAddr) throws IOException {
final String path = this.dataPath + File.separator + listenAddr.toString().replace(':', '_');
- LOG.info("Clean dir: {0}", path);
+ LOG.info("Clean dir: {}", path);
Utils.delete(new File(path));
}
@@ -334,11 +341,18 @@ public class TestCluster {
return null;
}
+ /**
+ * Wait until a leader is elected.
+ * @throws InterruptedException
+ */
public void waitLeader() throws InterruptedException {
+ Node node;
+
while (true) {
- final Node node = getLeader();
+ node = getLeader();
+
if (node != null) {
- return;
+ break;
}
else {
Thread.sleep(10);
@@ -363,25 +377,38 @@ public class TestCluster {
}
/**
- * Ensure all peers leader is expectAddr
+ * Ensure all peers follow the leader
*
- * @param expectAddr expected address
+ * @param node The leader.
* @throws InterruptedException if interrupted
*/
- public void ensureLeader(final Endpoint expectAddr) throws InterruptedException {
+ public void ensureLeader(final Node node) throws InterruptedException {
while (true) {
this.lock.lock();
- for (final Node node : this.nodes) {
- final PeerId leaderId = node.getLeaderId();
- if (!leaderId.getEndpoint().equals(expectAddr)) {
- this.lock.unlock();
+ try {
+ boolean wait = false;
+
+ for (final Node node0 : this.nodes) {
+ final PeerId leaderId = node0.getLeaderId();
+
+ if (leaderId == null || !leaderId.equals(node.getNodeId().getPeerId())) {
+ wait = true;
+
+ break;
+ }
+ }
+
+ if (wait) {
Thread.sleep(10);
+
continue;
}
+ else
+ return;
+ }
+ finally {
+ this.lock.unlock();
}
- // all is ready
- this.lock.unlock();
- return;
}
}
@@ -424,11 +451,16 @@ public class TestCluster {
return ret;
}
+ public void ensureSame() throws InterruptedException {
+ ensureSame(addr -> false);
+ }
+
/**
+ * @param filter The node to exclude filter.
* @return {@code True} if all FSM state are the same.
* @throws InterruptedException
*/
- public void ensureSame() throws InterruptedException {
+ public void ensureSame(Predicate<Endpoint> filter) throws InterruptedException {
this.lock.lock();
List<MockStateMachine> fsmList = new ArrayList<>(this.fsms.values());
@@ -439,18 +471,25 @@ public class TestCluster {
return;
}
- LOG.info("Start ensureSame");
+ Node leader = getLeader();
+
+ assertNotNull(leader);
+
+ MockStateMachine first = fsms.get(leader.getNodeId().getPeerId());
+
+ LOG.info("Start ensureSame, leader={}", leader);
try {
assertTrue(TestUtils.waitForCondition(() -> {
- MockStateMachine first = fsmList.get(0);
-
first.lock();
try {
- for (int i = 1; i < fsmList.size(); i++) {
+ for (int i = 0; i < fsmList.size(); i++) {
MockStateMachine fsm = fsmList.get(i);
+ if (fsm == first || filter.test(fsm.getAddress()))
+ continue;
+
fsm.lock();
try {
@@ -482,7 +521,12 @@ public class TestCluster {
}
finally {
this.lock.unlock();
- LOG.info("End ensureSame");
+
+ Node leader1 = getLeader();
+
+ LOG.info("End ensureSame, leader={}", leader1);
+
+ assertSame("Leader shouldn't change while comparing fsms", leader, leader1);
}
}
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractClientServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractClientServiceTest.java
index a722941..48d9d7f 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractClientServiceTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractClientServiceTest.java
@@ -16,8 +16,8 @@
*/
package org.apache.ignite.raft.jraft.rpc;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Status;
@@ -42,10 +42,11 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.when;
@RunWith(value = MockitoJUnitRunner.class)
public class AbstractClientServiceTest {
@@ -57,16 +58,16 @@ public class AbstractClientServiceTest {
private final Endpoint endpoint = new Endpoint("localhost", 8081);
@Before
- public void setup() {
+ public void setup() throws Exception {
this.rpcOptions = new RpcOptions();
this.rpcOptions.setClientExecutor(JRaftUtils.createClientExecutor(this.rpcOptions, "unittest"));
this.clientService = new MockClientService();
+ when(this.rpcClient.invokeAsync(any(), any(), any(), any(), anyLong())).thenReturn(new CompletableFuture<>());
this.rpcOptions.setRpcClient(this.rpcClient);
assertTrue(this.clientService.init(this.rpcOptions));
}
static class MockRpcResponseClosure<T extends Message> extends RpcResponseClosureAdapter<T> {
-
CountDownLatch latch = new CountDownLatch(1);
Status status;
@@ -79,33 +80,6 @@ public class AbstractClientServiceTest {
}
@Test
- public void testCancel() throws Exception {
- ArgumentCaptor<InvokeCallback> callbackArg = ArgumentCaptor.forClass(InvokeCallback.class);
- PingRequest request = TestUtils.createPingRequest();
-
- MockRpcResponseClosure<ErrorResponse> done = new MockRpcResponseClosure<>();
- Future<Message> future = this.clientService.invokeWithDone(this.endpoint, request, done, -1);
- Mockito.verify(this.rpcClient).invokeAsync(eq(this.endpoint), eq(request), Mockito.any(),
- callbackArg.capture(), eq((long) this.rpcOptions.getRpcDefaultTimeout()));
- InvokeCallback cb = callbackArg.getValue();
- assertNotNull(cb);
- assertNotNull(future);
-
- assertNull(done.getResponse());
- assertNull(done.status);
- assertFalse(future.isDone());
-
- future.cancel(true);
- ErrorResponse response = (ErrorResponse) this.rpcResponseFactory.newResponse(null, Status.OK());
- cb.complete(response, null);
-
- // The closure should be notified with ECANCELED error code.
- done.latch.await();
- assertNotNull(done.status);
- assertEquals(RaftError.ECANCELED.getNumber(), done.status.getCode());
- }
-
- @Test
public void testInvokeWithDoneOK() throws Exception {
ArgumentCaptor<InvokeCallback> callbackArg = ArgumentCaptor.forClass(InvokeCallback.class);
PingRequest request = TestUtils.createPingRequest();
@@ -125,11 +99,6 @@ public class AbstractClientServiceTest {
ErrorResponse response = (ErrorResponse) this.rpcResponseFactory.newResponse(null, Status.OK());
cb.complete(response, null);
- Message msg = future.get();
- assertNotNull(msg);
- assertTrue(msg instanceof ErrorResponse);
- assertSame(msg, response);
-
done.latch.await();
assertNotNull(done.status);
assertEquals(0, done.status.getCode());
@@ -155,14 +124,6 @@ public class AbstractClientServiceTest {
assertTrue(future.isDone());
- try {
- future.get();
- fail();
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof RemotingException);
- }
-
done.latch.await();
assertNotNull(done.status);
assertEquals(RaftError.EINTERNAL.getNumber(), done.status.getCode());
@@ -188,21 +149,13 @@ public class AbstractClientServiceTest {
cb.complete(null, new InvokeTimeoutException());
- try {
- future.get();
- fail();
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof InvokeTimeoutException);
- }
-
done.latch.await();
assertNotNull(done.status);
assertEquals(RaftError.ETIMEDOUT.getNumber(), done.status.getCode());
}
@Test
- public void testInvokeWithDOneOnErrorResponse() throws Exception {
+ public void testInvokeWithDoneOnErrorResponse() throws Exception {
final InvokeContext invokeCtx = new InvokeContext();
final ArgumentCaptor<InvokeCallback> callbackArg = ArgumentCaptor.forClass(InvokeCallback.class);
final CliRequests.GetPeersRequest request = CliRequests.GetPeersRequest.newBuilder() //
@@ -226,11 +179,6 @@ public class AbstractClientServiceTest {
new Status(-1, "failed"));
cb.complete(resp, null);
- final Message msg = future.get();
-
- assertTrue(msg instanceof ErrorResponse);
- assertEquals(((ErrorResponse) msg).getErrorMsg(), "failed");
-
done.latch.await();
assertNotNull(done.status);
assertTrue(!done.status.isOk());
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
index 9948db1..be786ed 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
@@ -21,14 +21,10 @@ import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Endpoint;
-import org.apache.ignite.raft.jraft.util.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -97,16 +93,6 @@ public abstract class AbstractRpcTest {
}
@Test
- public void testSyncProcessing() throws Exception {
- RpcClient client = createClient();
- Response1 resp1 = (Response1) client.invokeSync(endpoint, new Request1(), new InvokeContext(), 5000);
- assertNotNull(resp1);
-
- Response2 resp2 = (Response2) client.invokeSync(endpoint, new Request2(), new InvokeContext(), 5000);
- assertNotNull(resp2);
- }
-
- @Test
public void testAsyncProcessing() throws Exception {
RpcClient client = createClient();
@@ -147,52 +133,6 @@ public abstract class AbstractRpcTest {
}
@Test
- public void testRecordedSync() throws Exception {
- RpcClientEx client1 = (RpcClientEx) createClient();
- client1.recordMessages((a, b) -> true);
-
- assertTrue(client1.checkConnection(endpoint));
-
- Response1 resp1 = (Response1) client1.invokeSync(endpoint, new Request1(), 500);
- Response2 resp2 = (Response2) client1.invokeSync(endpoint, new Request2(), 500);
-
- assertNotNull(resp1);
- assertNotNull(resp2);
-
- Queue<Object[]> recorded = client1.recordedMessages();
-
- assertEquals(4, recorded.size());
- assertTrue(recorded.poll()[0] instanceof Request1);
- assertTrue(recorded.poll()[0] instanceof Response1);
- assertTrue(recorded.poll()[0] instanceof Request2);
- assertTrue(recorded.poll()[0] instanceof Response2);
- }
-
- @Test
- public void testRecordedSyncTimeout() {
- RpcClientEx client1 = (RpcClientEx) createClient();
- client1.recordMessages((a, b) -> true);
-
- assertTrue(client1.checkConnection(endpoint));
-
- try {
- Request1 request = new Request1();
- request.val = 10_000;
- client1.invokeSync(endpoint, request, 500);
-
- fail();
- }
- catch (Exception ignored) {
- // Expected.
- }
-
- Queue<Object[]> recorded = client1.recordedMessages();
-
- assertEquals(1, recorded.size());
- assertTrue(recorded.poll()[0] instanceof Request1);
- }
-
- @Test
public void testRecordedAsync() throws Exception {
RpcClientEx client1 = (RpcClientEx) createClient();
client1.recordMessages((a, b) -> true);
@@ -245,35 +185,6 @@ public abstract class AbstractRpcTest {
}
@Test
- public void testBlockedSync() throws Exception {
- RpcClientEx client1 = (RpcClientEx) createClient();
- client1.blockMessages((msg, id) -> msg instanceof Request1);
-
- assertTrue(client1.checkConnection(endpoint));
-
- Response2 resp2 = (Response2) client1.invokeSync(endpoint, new Request2(), 500);
-
- assertEquals(1, resp2.val);
-
- ExecutorService executorService = Executors.newSingleThreadExecutor();
-
- Future<Response1> resp = Utils.runInThread(executorService,
- () -> (Response1) client1.invokeSync(endpoint, new Request1(), 30_000));
-
- Thread.sleep(500);
-
- Queue<Object[]> msgs = client1.blockedMessages();
-
- assertEquals(1, msgs.size());
-
- assertFalse(resp.isDone());
-
- client1.stopBlock();
-
- resp.get(5_000, TimeUnit.MILLISECONDS);
- }
-
- @Test
public void testBlockedAsync() throws Exception {
RpcClientEx client1 = (RpcClientEx) createClient();
client1.blockMessages((msg, id) -> msg instanceof Request1);
@@ -282,7 +193,7 @@ public abstract class AbstractRpcTest {
CompletableFuture<Object> resp = new CompletableFuture<>();
- client1.invokeAsync(endpoint, new Request1(), (result, err) -> resp.complete(result), 30_000);
+ client1.invokeAsync(endpoint, new Request1(), null, (result, err) -> resp.complete(result), 30_000);
Thread.sleep(500);
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
index d5370b9..6c6abe4 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.NodeManager;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.util.Endpoint;
@@ -34,11 +35,10 @@ public class IgniteRpcTest extends AbstractRpcTest {
/** The counter. */
private final AtomicInteger cntr = new AtomicInteger();
- /** {@inheritDoc} */
@Override public RpcServer<?> createServer(Endpoint endpoint) {
ClusterService service = createService(endpoint.toString(), endpoint.getPort(), List.of());
- var server = new TestIgniteRpcServer(service, List.of(), new NodeManager()) {
+ var server = new TestIgniteRpcServer(service, List.of(), new NodeManager(), new NodeOptions()) {
@Override public void shutdown() {
super.shutdown();
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
index 9052f10..2dd9fa0 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
@@ -30,16 +30,17 @@ import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
*/
public class TestIgniteRpcServer extends IgniteRpcServer {
/**
- * @param clusterService cluster service
- * @param servers server list
- * @param nodeManager node manager
+ * @param clusterService Cluster service.
+ * @param servers Server list.
+ * @param nodeManager Node manager
+ * @param nodeOptions Node options.
*/
- public TestIgniteRpcServer(ClusterService clusterService, List<String> servers, NodeManager nodeManager) {
+ public TestIgniteRpcServer(ClusterService clusterService, List<String> servers, NodeManager nodeManager, NodeOptions nodeOptions) {
super(
clusterService,
nodeManager,
new RaftClientMessagesFactory(),
- JRaftUtils.createRequestExecutor(new NodeOptions())
+ JRaftUtils.createRequestExecutor(nodeOptions)
);
}
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotReaderTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotReaderTest.java
index 5f7e05f..2299e01 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotReaderTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotReaderTest.java
@@ -37,7 +37,6 @@ import static org.junit.Assert.assertTrue;
@RunWith(value = MockitoJUnitRunner.class)
public class LocalSnapshotReaderTest extends BaseStorageTest {
-
private LocalSnapshotReader reader;
@Mock
private LocalSnapshotStorage snapshotStorage;
@@ -48,13 +47,13 @@ public class LocalSnapshotReaderTest extends BaseStorageTest {
@Before
public void setup() throws Exception {
super.setup();
- this.path = this.path + File.separator + Snapshot.JRAFT_SNAPSHOT_PREFIX + snapshotIndex;
- new File(path).mkdirs();
+ String snapPath = this.path + File.separator + Snapshot.JRAFT_SNAPSHOT_PREFIX + snapshotIndex;
+ new File(snapPath).mkdirs();
this.table = new LocalSnapshotMetaTable(new RaftOptions());
this.table.addFile("testFile", LocalFileMetaOutter.LocalFileMeta.newBuilder().setChecksum("test").build());
- table.saveToFile(path + File.separator + Snapshot.JRAFT_SNAPSHOT_META_FILE);
+ table.saveToFile(snapPath + File.separator + Snapshot.JRAFT_SNAPSHOT_META_FILE);
this.reader = new LocalSnapshotReader(snapshotStorage, null, new Endpoint("localhost", 8081),
- new RaftOptions(), path);
+ new RaftOptions(), snapPath);
assertTrue(this.reader.init(null));
}