You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/06/21 07:09:12 UTC
[ignite-3] branch main updated: IGNITE-14918 Externalize
ClusterService lifecycle management - Fixes #183.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7ccdb7d IGNITE-14918 Externalize ClusterService lifecycle management - Fixes #183.
7ccdb7d is described below
commit 7ccdb7d344bcbdb56a074ef41f4e206ec4da79c3
Author: Aleksandr Polovtsev <al...@gmail.com>
AuthorDate: Mon Jun 21 10:05:01 2021 +0300
IGNITE-14918 Externalize ClusterService lifecycle management - Fixes #183.
Signed-off-by: Alexey Scherbakov <al...@gmail.com>
---
.../client/ITMetaStorageServiceTest.java | 9 +-
.../scalecube/ScaleCubeClusterServiceFactory.java | 4 +
.../client/service/impl/RaftGroupServiceImpl.java | 13 +-
.../raft/client/service/RaftGroupServiceTest.java | 26 +--
.../ignite/raft/jraft/core/ITCliServiceTest.java | 41 ++---
.../apache/ignite/raft/jraft/core/ITNodeTest.java | 180 +++++++++++----------
.../raft/server/ITJRaftCounterServerTest.java | 32 ++--
.../raft/server/ITSimpleCounterServerTest.java | 38 +++--
.../java/org/apache/ignite/internal/raft/Loza.java | 5 +-
.../internal/raft/server/impl/JRaftServerImpl.java | 17 +-
.../internal/raft/server/impl/RaftServerImpl.java | 17 +-
.../raft/jraft/rpc/impl/IgniteRpcClient.java | 17 +-
.../raft/jraft/rpc/impl/IgniteRpcServer.java | 110 ++++++-------
.../apache/ignite/raft/jraft/core/TestCluster.java | 130 +++++++++------
.../ignite/raft/jraft/rpc/AbstractRpcTest.java | 14 +-
.../ignite/raft/jraft/rpc/IgniteRpcTest.java | 35 ++--
.../ignite/raft/jraft/rpc/TestIgniteRpcServer.java | 57 ++-----
.../ignite/distributed/ITDistributedTableTest.java | 19 +--
18 files changed, 374 insertions(+), 390 deletions(-)
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
index b7fc033..aae7d25 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
@@ -1013,11 +1013,7 @@ public class ITMetaStorageServiceTest {
private MetaStorageService prepareMetaStorage(KeyValueStorage keyValStorageMock) {
List<Peer> peers = List.of(new Peer(cluster.get(0).topologyService().localMember().address()));
- metaStorageRaftSrv = new RaftServerImpl(
- cluster.get(0),
- FACTORY,
- true
- );
+ metaStorageRaftSrv = new RaftServerImpl(cluster.get(0), FACTORY);
metaStorageRaftSrv.
startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(keyValStorageMock), peers);
@@ -1029,8 +1025,7 @@ public class ITMetaStorageServiceTest {
10_000,
peers,
true,
- 200,
- true
+ 200
);
return new MetaStorageServiceImpl(metaStorageRaftGrpSvc);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 154f825..d7d2685 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -103,6 +103,10 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
/** {@inheritDoc} */
@Override public void shutdown() {
+ // local member will be null, if cluster has not been started
+ if (cluster.member() == null)
+ return;
+
stopJmxMonitor();
cluster.shutdown();
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
index e4a938c..0d04eec 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
@@ -90,9 +90,6 @@ public class RaftGroupServiceImpl implements RaftGroupService {
/** */
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
- /** */
- private final boolean reuse;
-
/**
* @param groupId Group id.
* @param cluster A cluster.
@@ -101,7 +98,6 @@ public class RaftGroupServiceImpl implements RaftGroupService {
* @param peers Initial group configuration.
* @param refreshLeader {@code True} to synchronously refresh leader on service creation.
* @param retryDelay Retry delay.
- * @param reuse {@code True} to reuse cluster service (avoid lifecycle management).
*/
public RaftGroupServiceImpl(
String groupId,
@@ -110,8 +106,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
int timeout,
List<Peer> peers,
boolean refreshLeader,
- long retryDelay,
- boolean reuse
+ long retryDelay
) {
this.cluster = requireNonNull(cluster);
this.peers = requireNonNull(peers);
@@ -119,10 +114,6 @@ public class RaftGroupServiceImpl implements RaftGroupService {
this.timeout = timeout;
this.groupId = groupId;
this.retryDelay = retryDelay;
- this.reuse = reuse;
-
- if (!reuse)
- cluster.start();
if (refreshLeader) {
try {
@@ -342,8 +333,6 @@ public class RaftGroupServiceImpl implements RaftGroupService {
/** {@inheritDoc} */
@Override public void shutdown() {
- if (!reuse)
- cluster.shutdown();
}
/**
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
index de76740..c31687e 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
@@ -118,7 +118,7 @@ public class RaftGroupServiceTest {
mockLeaderRequest(false);
RaftGroupService service =
- new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+ new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
assertNull(service.leader());
@@ -140,7 +140,7 @@ public class RaftGroupServiceTest {
leader = null;
RaftGroupService service =
- new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+ new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
assertNull(service.leader());
@@ -175,7 +175,7 @@ public class RaftGroupServiceTest {
}, 500);
RaftGroupService service =
- new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+ new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
assertNull(service.leader());
@@ -194,7 +194,7 @@ public class RaftGroupServiceTest {
mockLeaderRequest(true);
RaftGroupService service =
- new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+ new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
try {
service.refreshLeader().get(500, TimeUnit.MILLISECONDS);
@@ -217,7 +217,7 @@ public class RaftGroupServiceTest {
mockUserInput(false, null);
RaftGroupService service =
- new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+ new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
service.refreshLeader().get();
@@ -237,7 +237,7 @@ public class RaftGroupServiceTest {
mockUserInput(false, null);
RaftGroupService service =
- new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+ new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
assertNull(service.leader());
@@ -259,7 +259,7 @@ public class RaftGroupServiceTest {
mockUserInput(true, null);
RaftGroupService service =
- new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+ new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
try {
service.run(new TestCommand()).get(500, TimeUnit.MILLISECONDS);
@@ -282,7 +282,7 @@ public class RaftGroupServiceTest {
mockUserInput(false, null);
RaftGroupService service =
- new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, true);
+ new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY);
Peer leader = this.leader;
@@ -313,7 +313,7 @@ public class RaftGroupServiceTest {
mockUserInput(false, null);
RaftGroupService service =
- new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, true);
+ new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY);
Peer leader = this.leader;
@@ -349,7 +349,7 @@ public class RaftGroupServiceTest {
mockUserInput(false, NODES.get(0));
RaftGroupService service =
- new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT * 3, NODES, true, DELAY, true);
+ new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT * 3, NODES, true, DELAY);
Peer leader = this.leader;
@@ -387,7 +387,7 @@ public class RaftGroupServiceTest {
mockUserInput(false, null);
RaftGroupService service =
- new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, true);
+ new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY);
Peer leader = this.leader;
@@ -418,7 +418,7 @@ public class RaftGroupServiceTest {
mockSnapshotRequest(1);
RaftGroupService service =
- new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+ new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
CompletableFuture<Void> fut = service.snapshot(new Peer("localhost:8082"));
@@ -442,7 +442,7 @@ public class RaftGroupServiceTest {
mockSnapshotRequest(0);
RaftGroupService service =
- new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+ new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
CompletableFuture<Void> fut = service.snapshot(new Peer("localhost:8082"));
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
index 052f35d..05b8a56 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
@@ -34,9 +34,7 @@ import java.util.stream.Collectors;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
-import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
-import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.jraft.CliService;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
@@ -70,29 +68,21 @@ public class ITCliServiceTest {
/**
* The logger.
*/
- static final Logger LOG = LoggerFactory.getLogger(ITCliServiceTest.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ITCliServiceTest.class);
- /**
- * The registry.
- */
- private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
-
- /**
- * The message factory.
- */
- private final static ScaleCubeClusterServiceFactory factory = new TestScaleCubeClusterServiceFactory();
+ private static final int LEARNER_PORT_STEP = 100;
private String dataPath;
private TestCluster cluster;
+
private final String groupId = "CliServiceTest";
private CliService cliService;
private Configuration conf;
- private static final int LEARNER_PORT_STEP = 100;
-
+ /** */
@BeforeEach
public void setup(TestInfo testInfo) throws Exception {
LOG.info(">>>>>>>>>>>>>>> Start test method: " + testInfo.getDisplayName());
@@ -130,11 +120,26 @@ public class ITCliServiceTest {
CliOptions opts = new CliOptions();
opts.setClientExecutor(JRaftUtils.createClientExecutor(opts, "client"));
- ClusterService clientSvc = factory.createClusterService(new ClusterLocalConfiguration("client",
- TestUtils.INIT_PORT - 1, peers.stream().map(p -> p.getEndpoint().toString()).collect(Collectors.toList()),
- SERIALIZATION_REGISTRY));
+ List<String> memberAddresses = peers.stream().map(p -> p.getEndpoint().toString()).collect(Collectors.toList());
+
+ var registry = new MessageSerializationRegistryImpl();
+
+ var serviceConfig = new ClusterLocalConfiguration("client", TestUtils.INIT_PORT - 1, memberAddresses, registry);
+
+ var factory = new TestScaleCubeClusterServiceFactory();
+
+ ClusterService clientSvc = factory.createClusterService(serviceConfig);
+
+ clientSvc.start();
+
+ IgniteRpcClient rpcClient = new IgniteRpcClient(clientSvc) {
+ @Override public void shutdown() {
+ super.shutdown();
+
+ clientSvc.shutdown();
+ }
+ };
- IgniteRpcClient rpcClient = new IgniteRpcClient(clientSvc, false);
opts.setRpcClient(rpcClient);
assertTrue(cliService.init(opts));
}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
index 80c54e6..159bba9 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
@@ -35,10 +35,14 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import com.codahale.metrics.ConsoleReporter;
+import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.Iterator;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
@@ -101,20 +105,11 @@ import static org.junit.jupiter.api.Assertions.fail;
* Integration tests for raft cluster.
*/
public class ITNodeTest {
- static final Logger LOG = LoggerFactory.getLogger(ITNodeTest.class);
-
- private String dataPath;
-
- private final AtomicInteger startedCounter = new AtomicInteger(0);
- private final AtomicInteger stoppedCounter = new AtomicInteger(0);
-
- private long testStartMs;
+ private static final Logger LOG = LoggerFactory.getLogger(ITNodeTest.class);
private static DumpThread dumpThread;
- private TestCluster cluster;
-
- static class DumpThread extends Thread {
+ private static class DumpThread extends Thread {
private static long DUMP_TIMEOUT_MS = 5 * 60 * 1000;
private volatile boolean stopped = false;
@@ -135,6 +130,18 @@ public class ITNodeTest {
}
}
+ private String dataPath;
+
+ private final AtomicInteger startedCounter = new AtomicInteger(0);
+
+ private final AtomicInteger stoppedCounter = new AtomicInteger(0);
+
+ private long testStartMs;
+
+ private TestCluster cluster;
+
+ private final List<RaftGroupService> services = new ArrayList<>();
+
@BeforeAll
public static void setupNodeTest() {
dumpThread = new DumpThread();
@@ -167,6 +174,14 @@ public class ITNodeTest {
@AfterEach
public void teardown(TestInfo testInfo) throws Exception {
+ services.forEach(service -> {
+ try {
+ service.shutdown();
+ } catch (Exception e) {
+ LOG.error("Error while closing a service", e);
+ }
+ });
+
if (cluster != null)
cluster.stopAll();
@@ -190,8 +205,6 @@ public class ITNodeTest {
RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions);
service.start(true);
-
- service.shutdown();
}
@Test
@@ -235,13 +248,9 @@ public class ITNodeTest {
node.apply(task);
tasks.add(task);
}
- try {
- Task.joinAll(tasks, TimeUnit.SECONDS.toMillis(30));
- assertEquals(10, c.get());
- }
- finally {
- service.shutdown();
- }
+
+ Task.joinAll(tasks, TimeUnit.SECONDS.toMillis(30));
+ assertEquals(10, c.get());
}
/**
@@ -349,8 +358,6 @@ public class ITNodeTest {
// No read-index request succeed.
assertEquals(0, readIndexSuccesses.get());
assertTrue(n - 1 >= currentValue.get());
-
- service.shutdown();
}
@Test
@@ -380,7 +387,6 @@ public class ITNodeTest {
int i = 0;
for (ByteBuffer data : fsm.getLogs())
assertEquals("hello" + i++, new String(data.array()));
- service.shutdown();
}
@Test
@@ -1520,16 +1526,14 @@ public class ITNodeTest {
NodeImpl follower0 = (NodeImpl) follower;
DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService();
RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient();
- rpcClientEx.blockMessages(new BiPredicate<>() {
- @Override public boolean test(Object msg, String nodeId) {
- if (msg instanceof RpcRequests.RequestVoteRequest) {
- RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest)msg;
-
- return !msg0.getPreVote();
- }
+ rpcClientEx.blockMessages((msg, nodeId) -> {
+ if (msg instanceof RpcRequests.RequestVoteRequest) {
+ RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest)msg;
- return false;
+ return !msg0.getPreVote();
}
+
+ return false;
});
}
@@ -2273,8 +2277,6 @@ public class ITNodeTest {
node.snapshot(new ExpectClosure(RaftError.EINVAL, "Snapshot is not supported", latch));
waitLatch(latch);
assertEquals(0, fsm.getSaveSnapshotTimes());
-
- service.shutdown();
}
@Test
@@ -2302,8 +2304,6 @@ public class ITNodeTest {
int times = fsm.getSaveSnapshotTimes();
assertTrue(times >= 1, "snapshotTimes=" + times);
assertTrue(fsm.getSnapshotIndex() > 0);
-
- service.shutdown();
}
@Test
@@ -2551,10 +2551,6 @@ public class ITNodeTest {
catch (Exception e) {
// Expected.
}
- finally {
- service.shutdown();
- }
-
}
}
@@ -2906,22 +2902,18 @@ public class ITNodeTest {
nodeOpts.setFsm(fsm);
RaftGroupService service = createService("test", new PeerId(addr, 0), nodeOpts);
- try {
- Node node = service.start(true);
- assertEquals(26, fsm.getLogs().size());
- for (int i = 0; i < 26; i++)
- assertEquals('a' + i, fsm.getLogs().get(i).get());
+ Node node = service.start(true);
+ assertEquals(26, fsm.getLogs().size());
- // Group configuration will be restored from snapshot meta.
- while (!node.isLeader())
- Thread.sleep(20);
- sendTestTaskAndWait(node);
- assertEquals(36, fsm.getLogs().size());
- }
- finally {
- service.shutdown();
- }
+ for (int i = 0; i < 26; i++)
+ assertEquals('a' + i, fsm.getLogs().get(i).get());
+
+ // Group configuration will be restored from snapshot meta.
+ while (!node.isLeader())
+ Thread.sleep(20);
+ sendTestTaskAndWait(node);
+ assertEquals(36, fsm.getLogs().size());
}
@Test
@@ -2948,16 +2940,12 @@ public class ITNodeTest {
nodeOpts.setFsm(fsm);
RaftGroupService service = createService("test", new PeerId(addr, 0), nodeOpts);
- try {
- Node node = service.start(true);
- while (!node.isLeader())
- Thread.sleep(20);
- sendTestTaskAndWait(node);
- assertEquals(10, fsm.getLogs().size());
- }
- finally {
- service.shutdown();
- }
+
+ Node node = service.start(true);
+ while (!node.isLeader())
+ Thread.sleep(20);
+ sendTestTaskAndWait(node);
+ assertEquals(10, fsm.getLogs().size());
}
@Test
@@ -3344,16 +3332,14 @@ public class ITNodeTest {
NodeImpl follower0 = (NodeImpl) follower;
DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService();
RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient();
- rpcClientEx.blockMessages(new BiPredicate<>() {
- @Override public boolean test(Object msg, String nodeId) {
- if (msg instanceof RpcRequests.RequestVoteRequest) {
- RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest)msg;
-
- return !msg0.getPreVote();
- }
+ rpcClientEx.blockMessages((msg, nodeId) -> {
+ if (msg instanceof RpcRequests.RequestVoteRequest) {
+ RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest)msg;
- return false;
+ return !msg0.getPreVote();
}
+
+ return false;
});
}
@@ -3453,23 +3439,49 @@ public class ITNodeTest {
* @return Raft group service.
*/
private RaftGroupService createService(String groupId, PeerId peerId, NodeOptions nodeOptions) {
- NodeManager nodeManager = new NodeManager();
-
- List<String> servers = new ArrayList<>();
-
Configuration initialConf = nodeOptions.getInitialConf();
- if (initialConf != null) {
- for (PeerId id : initialConf.getPeers())
- servers.add(id.getEndpoint().toString());
+ var servers = List.<String>of();
- for (PeerId id : initialConf.getLearners())
- servers.add(id.getEndpoint().toString());
+ if (initialConf != null) {
+ servers = Stream.concat(initialConf.getPeers().stream(), initialConf.getLearners().stream())
+ .map(id -> id.getEndpoint().toString())
+ .collect(Collectors.toList());
}
- IgniteRpcServer rpcServer = new TestIgniteRpcServer(peerId.getEndpoint(), servers, nodeManager);
- nodeOptions.setRpcClient(new IgniteRpcClient(rpcServer.clusterService(), true));
+ var nodeManager = new NodeManager();
+
+ ClusterService clusterService = createClusterService(peerId.getEndpoint(), servers);
+
+ IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, servers, nodeManager);
+
+ nodeOptions.setRpcClient(new IgniteRpcClient(clusterService));
+
+ clusterService.start();
+
+ var service = new RaftGroupService(groupId, peerId, nodeOptions, rpcServer, nodeManager) {
+ @Override public synchronized void shutdown() {
+ super.shutdown();
+
+ clusterService.shutdown();
+ }
+ };
+
+ services.add(service);
+
+ return service;
+ }
+
+ /**
+ * Creates a non-started {@link ClusterService}.
+ */
+ private static ClusterService createClusterService(Endpoint endpoint, List<String> members) {
+ var registry = new TestMessageSerializationRegistryImpl();
+
+ var clusterConfig = new ClusterLocalConfiguration(endpoint.toString(), endpoint.getPort(), members, registry);
+
+ var clusterServiceFactory = new TestScaleCubeClusterServiceFactory();
- return new RaftGroupService(groupId, peerId, nodeOptions, rpcServer, nodeManager);
+ return clusterServiceFactory.createClusterService(clusterConfig);
}
}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
index 06c949b..c07a815 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
@@ -102,12 +102,12 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
/**
* Servers list.
*/
- protected List<JRaftServerImpl> servers = new ArrayList<>();
+ protected final List<JRaftServerImpl> servers = new ArrayList<>();
/**
* Clients list.
*/
- protected List<RaftGroupService> clients = new ArrayList<>();
+ private final List<RaftGroupService> clients = new ArrayList<>();
/**
* Data path.
@@ -146,9 +146,15 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
*/
private JRaftServerImpl startServer(int idx, Consumer<RaftServer> clo) {
ClusterService service = clusterService("server" + idx, PORT + idx,
- List.of(getLocalAddress() + ":" + PORT), false);
+ List.of(getLocalAddress() + ":" + PORT), true);
- JRaftServerImpl server = new JRaftServerImpl(service, dataPath, FACTORY, false);
+ JRaftServerImpl server = new JRaftServerImpl(service, dataPath, FACTORY) {
+ @Override public void shutdown() throws Exception {
+ super.shutdown();
+
+ service.shutdown();
+ }
+ };
clo.accept(server);
@@ -166,11 +172,17 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
private RaftGroupService startClient(String groupId) {
String addr = getLocalAddress() + ":" + PORT;
- ClusterService clientNode1 = clusterService("client_" + groupId + "_", CLIENT_PORT + clients.size(),
- List.of(addr), false);
+ ClusterService clientNode = clusterService("client_" + groupId + "_", CLIENT_PORT + clients.size(),
+ List.of(addr), true);
- RaftGroupServiceImpl client = new RaftGroupServiceImpl(groupId, clientNode1, FACTORY, 10_000,
- List.of(new Peer(addr)), false, 200, false);
+ RaftGroupServiceImpl client = new RaftGroupServiceImpl(groupId, clientNode, FACTORY, 10_000,
+ List.of(new Peer(addr)), false, 200) {
+ @Override public void shutdown() {
+ super.shutdown();
+
+ clientNode.shutdown();
+ }
+ };
clients.add(client);
@@ -345,7 +357,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
public void testApplyWithFailure() throws Exception {
listenerFactory = () -> new CounterListener() {
@Override public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
- Iterator<CommandClosure<WriteCommand>> wrapper = new Iterator<CommandClosure<WriteCommand>>() {
+ Iterator<CommandClosure<WriteCommand>> wrapper = new Iterator<>() {
@Override public boolean hasNext() {
return iterator.hasNext();
}
@@ -353,7 +365,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
@Override public CommandClosure<WriteCommand> next() {
CommandClosure<WriteCommand> cmd = iterator.next();
- IncrementAndGetCommand command = (IncrementAndGetCommand) cmd.command();
+ IncrementAndGetCommand command = (IncrementAndGetCommand)cmd.command();
if (command.delta() == 10)
throw new IgniteInternalException("Very bad");
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITSimpleCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITSimpleCounterServerTest.java
index 6296836..efb701c 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITSimpleCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITSimpleCounterServerTest.java
@@ -19,12 +19,12 @@ package org.apache.ignite.raft.server;
import java.util.List;
import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
-import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -74,24 +74,42 @@ class ITSimpleCounterServerTest extends RaftServerAbstractTest {
String id = "localhost:" + PORT;
- ClusterService service = clusterService(id, PORT, List.of(), false);
+ ClusterService service = clusterService(id, PORT, List.of(), true);
+
+ server = new RaftServerImpl(service, FACTORY) {
+ @Override public synchronized void shutdown() throws Exception {
+ super.shutdown();
- server = new RaftServerImpl(service, FACTORY, false);
+ service.shutdown();
+ }
+ };
- ClusterNode serverNode = this.server.clusterService().topologyService().localMember();
+ ClusterNode serverNode = server.clusterService().topologyService().localMember();
- this.server.startRaftGroup(COUNTER_GROUP_ID_0, new CounterListener(), List.of(new Peer(serverNode.address())));
- this.server.startRaftGroup(COUNTER_GROUP_ID_1, new CounterListener(), List.of(new Peer(serverNode.address())));
+ server.startRaftGroup(COUNTER_GROUP_ID_0, new CounterListener(), List.of(new Peer(serverNode.address())));
+ server.startRaftGroup(COUNTER_GROUP_ID_1, new CounterListener(), List.of(new Peer(serverNode.address())));
- ClusterService clientNode1 = clusterService("localhost:" + (PORT + 1), PORT + 1, List.of(id), false);
+ ClusterService clientNode1 = clusterService("localhost:" + (PORT + 1), PORT + 1, List.of(id), true);
client1 = new RaftGroupServiceImpl(COUNTER_GROUP_ID_0, clientNode1, FACTORY, 1000,
- List.of(new Peer(serverNode.address())), false, 200, false);
+ List.of(new Peer(serverNode.address())), false, 200) {
+ @Override public void shutdown() {
+ super.shutdown();
- ClusterService clientNode2 = clusterService("localhost:" + (PORT + 2), PORT + 2, List.of(id), false);
+ clientNode1.shutdown();
+ }
+ };
+
+ ClusterService clientNode2 = clusterService("localhost:" + (PORT + 2), PORT + 2, List.of(id), true);
client2 = new RaftGroupServiceImpl(COUNTER_GROUP_ID_1, clientNode2, FACTORY, 1000,
- List.of(new Peer(serverNode.address())), false, 200, false);
+ List.of(new Peer(serverNode.address())), false, 200) {
+ @Override public void shutdown() {
+ super.shutdown();
+
+ clientNode2.shutdown();
+ }
+ };
assertTrue(waitForTopology(service, 2, 1000));
assertTrue(waitForTopology(clientNode1, 2, 1000));
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index c6dc324..bfa8066 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -56,7 +56,7 @@ public class Loza {
public Loza(ClusterService clusterNetSvc) {
this.clusterNetSvc = clusterNetSvc;
- this.raftServer = new RaftServerImpl(clusterNetSvc, FACTORY, true);
+ this.raftServer = new RaftServerImpl(clusterNetSvc, FACTORY);
}
/**
@@ -82,8 +82,7 @@ public class Loza {
TIMEOUT,
peers.stream().map(i -> new Peer(i.address())).collect(Collectors.toList()),
true,
- DELAY,
- true
+ DELAY
);
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
index 1d43aed..e29af9b 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
@@ -78,24 +78,21 @@ public class JRaftServerImpl implements RaftServer {
* @param service Cluster service.
* @param dataPath Data path.
* @param factory The factory.
- * @param reuse {@code True} to reuse cluster service (do not manage lifecyle)
*/
- public JRaftServerImpl(ClusterService service, String dataPath, RaftClientMessagesFactory factory, boolean reuse) {
- this(service, dataPath, factory, reuse, new NodeOptions());
+ public JRaftServerImpl(ClusterService service, String dataPath, RaftClientMessagesFactory factory) {
+ this(service, dataPath, factory, new NodeOptions());
}
/**
* @param service Cluster service.
* @param dataPath Data path.
* @param factory The factory.
- * @param reuse {@code True} to reuse cluster service (do not manage lifecyle)
* @param opts Default node options.
*/
public JRaftServerImpl(
ClusterService service,
String dataPath,
RaftClientMessagesFactory factory,
- boolean reuse,
NodeOptions opts
) {
this.service = service;
@@ -103,7 +100,7 @@ public class JRaftServerImpl implements RaftServer {
this.nodeManager = new NodeManager();
this.opts = opts;
- assert !reuse || service.topologyService().localMember() != null;
+ assert service.topologyService().localMember() != null;
if (opts.getServerName() == null)
opts.setServerName(service.localConfiguration().getName());
@@ -120,7 +117,7 @@ public class JRaftServerImpl implements RaftServer {
if (opts.getClientExecutor() == null)
opts.setClientExecutor(JRaftUtils.createClientExecutor(opts, opts.getServerName()));
- rpcServer = new IgniteRpcServer(service, reuse, nodeManager, factory, JRaftUtils.createRequestExecutor(opts));
+ rpcServer = new IgniteRpcServer(service, nodeManager, factory, JRaftUtils.createRequestExecutor(opts));
rpcServer.init(null);
}
@@ -164,14 +161,12 @@ public class JRaftServerImpl implements RaftServer {
nodeOptions.setFsm(new DelegatingStateMachine(lsnr));
if (initialConf != null) {
- List<PeerId> mapped = initialConf.stream().map(p -> {
- return PeerId.fromPeer(p);
- }).collect(Collectors.toList());
+ List<PeerId> mapped = initialConf.stream().map(PeerId::fromPeer).collect(Collectors.toList());
nodeOptions.setInitialConf(new Configuration(mapped, null));
}
- IgniteRpcClient client = new IgniteRpcClient(service, true);
+ IgniteRpcClient client = new IgniteRpcClient(service);
nodeOptions.setRpcClient(client);
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
index 1ac9972..1139159 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
@@ -76,25 +76,16 @@ public class RaftServerImpl implements RaftServer {
/** */
private final Thread writeWorker;
- /** */
- private final boolean reuse;
-
/**
* @param service Network service.
* @param clientMsgFactory Client message factory.
- * @param reuse {@code True} to reuse cluster service.
*/
- public RaftServerImpl(
- ClusterService service,
- RaftClientMessagesFactory clientMsgFactory,
- boolean reuse
- ) {
+ public RaftServerImpl(ClusterService service, RaftClientMessagesFactory clientMsgFactory) {
Objects.requireNonNull(service);
Objects.requireNonNull(clientMsgFactory);
this.service = service;
this.clientMsgFactory = clientMsgFactory;
- this.reuse = reuse;
readQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
writeQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
@@ -124,9 +115,6 @@ public class RaftServerImpl implements RaftServer {
// TODO https://issues.apache.org/jira/browse/IGNITE-14775
});
- if (!reuse)
- service.start();
-
readWorker = new Thread(() -> processQueue(readQueue, RaftGroupListener::onRead), "read-cmd-worker#" + service.topologyService().localMember().toString());
readWorker.setDaemon(true);
readWorker.start();
@@ -172,9 +160,6 @@ public class RaftServerImpl implements RaftServer {
writeWorker.interrupt();
writeWorker.join();
- if (!reuse)
- service.shutdown();
-
LOG.info("Stopped replication server [node=" + service.toString() + ']');
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
index f0d0495..6d23508 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
-import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.TopologyEventHandler;
@@ -47,18 +46,11 @@ public class IgniteRpcClient implements RpcClientEx {
private final ClusterService service;
- private final boolean reuse;
-
/**
* @param service The service.
- * @param reuse {@code True} to reuse already started service.
*/
- public IgniteRpcClient(ClusterService service, boolean reuse) {
+ public IgniteRpcClient(ClusterService service) {
this.service = service;
- this.reuse = reuse;
-
- if (!reuse) // TODO asch use init
- service.start();
}
public ClusterService clusterService() {
@@ -181,13 +173,6 @@ public class IgniteRpcClient implements RpcClientEx {
/** {@inheritDoc} */
@Override public void shutdown() {
- try {
- if (!reuse)
- service.shutdown();
- }
- catch (Exception e) {
- throw new IgniteInternalException(e);
- }
}
/** {@inheritDoc} */
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index e5fc4e3..31404a1 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -21,11 +21,9 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
-import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.NetworkMessageHandler;
import org.apache.ignite.network.TopologyEventHandler;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.jraft.NodeManager;
@@ -59,30 +57,24 @@ public class IgniteRpcServer implements RpcServer<Void> {
/** Factory. */
private static final RaftClientMessagesFactory FACTORY = new RaftClientMessagesFactory();
- /** The {@code true} to reuse cluster service. */
- private final boolean reuse;
-
private final ClusterService service;
- private List<ConnectionClosedEventListener> listeners = new CopyOnWriteArrayList<>();
+ private final List<ConnectionClosedEventListener> listeners = new CopyOnWriteArrayList<>();
- private Map<String, RpcProcessor> processors = new ConcurrentHashMap<>();
+ private final Map<String, RpcProcessor> processors = new ConcurrentHashMap<>();
/**
* @param service The cluster service.
- * @param reuse {@code True} to reuse service (do no manage lifecycle).
* @param nodeManager The node manager.
* @param factory Message factory.
* @param rpcExecutor The executor for RPC requests.
*/
public IgniteRpcServer(
ClusterService service,
- boolean reuse,
NodeManager nodeManager,
RaftClientMessagesFactory factory,
@Nullable Executor rpcExecutor
) {
- this.reuse = reuse;
this.service = service;
// raft server RPC
@@ -112,60 +104,59 @@ public class IgniteRpcServer implements RpcServer<Void> {
registerProcessor(new ActionRequestProcessor(rpcExecutor, FACTORY));
registerProcessor(new org.apache.ignite.raft.jraft.rpc.impl.client.SnapshotRequestProcessor(rpcExecutor, FACTORY));
- service.messagingService().addMessageHandler(new NetworkMessageHandler() {
- @Override public void onReceived(NetworkMessage msg, ClusterNode sender, String corellationId) {
- Class<? extends NetworkMessage> cls = msg.getClass();
- RpcProcessor prc = processors.get(cls.getName());
+ service.messagingService().addMessageHandler((msg, sender, corellationId) -> {
+ Class<? extends NetworkMessage> cls = msg.getClass();
+ RpcProcessor<NetworkMessage> prc = processors.get(cls.getName());
- // TODO asch cache mapping https://issues.apache.org/jira/browse/IGNITE-14832
- if (prc == null) {
- for (Class<?> iface : cls.getInterfaces()) {
- prc = processors.get(iface.getName());
+ // TODO asch cache mapping https://issues.apache.org/jira/browse/IGNITE-14832
+ if (prc == null) {
+ for (Class<?> iface : cls.getInterfaces()) {
+ prc = processors.get(iface.getName());
- if (prc != null)
- break;
- }
+ if (prc != null)
+ break;
}
+ }
- if (prc == null)
- return;
+ if (prc == null)
+ return;
- RpcProcessor.ExecutorSelector selector = prc.executorSelector();
+ RpcProcessor.ExecutorSelector selector = prc.executorSelector();
- Executor executor = null;
+ Executor executor = null;
- if (selector != null) {
- executor = selector.select(prc.getClass().getName(), msg, nodeManager);
- }
+ if (selector != null)
+ executor = selector.select(prc.getClass().getName(), msg, nodeManager);
- if (executor == null)
- executor = prc.executor();
+ if (executor == null)
+ executor = prc.executor();
- if (executor == null)
- executor = rpcExecutor;
+ if (executor == null)
+ executor = rpcExecutor;
- RpcProcessor finalPrc = prc;
+ RpcProcessor<NetworkMessage> finalPrc = prc;
- executor.execute(() -> {
- finalPrc.handleRequest(new RpcContext() {
- @Override public NodeManager getNodeManager() {
- return nodeManager;
- }
+ executor.execute(() -> {
+ var context = new RpcContext() {
+ @Override public NodeManager getNodeManager() {
+ return nodeManager;
+ }
- @Override public void sendResponse(Object responseObj) {
- service.messagingService().send(sender, (NetworkMessage) responseObj, corellationId);
- }
+ @Override public void sendResponse(Object responseObj) {
+ service.messagingService().send(sender, (NetworkMessage) responseObj, corellationId);
+ }
- @Override public String getRemoteAddress() {
- return sender.address();
- }
+ @Override public String getRemoteAddress() {
+ return sender.address();
+ }
- @Override public String getLocalAddress() {
- return service.topologyService().localMember().address();
- }
- }, msg);
- });
- }
+ @Override public String getLocalAddress() {
+ return service.topologyService().localMember().address();
+ }
+ };
+
+ finalPrc.handleRequest(context, msg);
+ });
});
service.topologyService().addEventHandler(new TopologyEventHandler() {
@@ -180,23 +171,24 @@ public class IgniteRpcServer implements RpcServer<Void> {
});
}
+ /** {@inheritDoc} */
@Override public void registerConnectionClosedEventListener(ConnectionClosedEventListener listener) {
- if (!listeners.contains(listeners))
+ if (!listeners.contains(listener))
listeners.add(listener);
}
+ /** {@inheritDoc} */
@Override public void registerProcessor(RpcProcessor<?> processor) {
processors.put(processor.interest(), processor);
}
+ /** {@inheritDoc} */
@Override public int boundPort() {
return 0;
}
+ /** {@inheritDoc} */
@Override public boolean init(Void opts) {
- if (!reuse)
- service.start();
-
return true;
}
@@ -204,15 +196,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
return service;
}
+ /** {@inheritDoc} */
@Override public void shutdown() {
- if (reuse)
- return;
-
- try {
- service.shutdown();
- }
- catch (Exception e) {
- throw new IgniteInternalException(e);
- }
}
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 94a2efe..92fe82a 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -30,6 +30,10 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.JRaftServiceFactory;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
@@ -41,7 +45,6 @@ import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.rpc.TestIgniteRpcServer;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
-import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Endpoint;
@@ -64,7 +67,7 @@ public class TestCluster {
private final List<PeerId> peers;
private final List<NodeImpl> nodes;
private final LinkedHashMap<PeerId, MockStateMachine> fsms;
- private final ConcurrentMap<String, RaftGroupService> serverMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Endpoint, RaftGroupService> serverMap = new ConcurrentHashMap<>();
private final int electionTimeoutMs;
private final Lock lock = new ReentrantLock();
private final Consumer<NodeOptions> optsClo;
@@ -160,68 +163,93 @@ public class TestCluster {
final boolean enableMetrics, final SnapshotThrottle snapshotThrottle,
final RaftOptions raftOptions, final int priority) throws IOException {
- if (this.serverMap.get(listenAddr.toString()) != null) {
- return true;
- }
+ this.lock.lock();
+ try {
+ if (this.serverMap.get(listenAddr) != null) {
+ return true;
+ }
- final NodeOptions nodeOptions = new NodeOptions();
+ final NodeOptions nodeOptions = new NodeOptions();
- nodeOptions.setServerName(listenAddr.toString());
+ nodeOptions.setServerName(listenAddr.toString());
- nodeOptions.setCommonExecutor(JRaftUtils.createCommonExecutor(nodeOptions));
- nodeOptions.setStripedExecutor(JRaftUtils.createAppendEntriesExecutor(nodeOptions));
- nodeOptions.setClientExecutor(JRaftUtils.createClientExecutor(nodeOptions, nodeOptions.getServerName()));
- nodeOptions.setScheduler(JRaftUtils.createScheduler(nodeOptions));
+ nodeOptions.setCommonExecutor(JRaftUtils.createCommonExecutor(nodeOptions));
+ nodeOptions.setStripedExecutor(JRaftUtils.createAppendEntriesExecutor(nodeOptions));
+ nodeOptions.setClientExecutor(JRaftUtils.createClientExecutor(nodeOptions, nodeOptions.getServerName()));
+ nodeOptions.setScheduler(JRaftUtils.createScheduler(nodeOptions));
- nodeOptions.setElectionTimeoutMs(this.electionTimeoutMs);
- nodeOptions.setEnableMetrics(enableMetrics);
- nodeOptions.setSnapshotThrottle(snapshotThrottle);
- nodeOptions.setSnapshotIntervalSecs(snapshotIntervalSecs);
- nodeOptions.setServiceFactory(this.raftServiceFactory);
- if (raftOptions != null) {
- nodeOptions.setRaftOptions(raftOptions);
- }
- final String serverDataPath = this.dataPath + File.separator + listenAddr.toString().replace(':', '_');
- new File(serverDataPath).mkdirs();
- nodeOptions.setLogUri(serverDataPath + File.separator + "logs");
- nodeOptions.setRaftMetaUri(serverDataPath + File.separator + "meta");
- nodeOptions.setSnapshotUri(serverDataPath + File.separator + "snapshot");
- nodeOptions.setElectionPriority(priority);
-
- final MockStateMachine fsm = new MockStateMachine(listenAddr);
- nodeOptions.setFsm(fsm);
-
- if (!emptyPeers) {
- nodeOptions.setInitialConf(new Configuration(this.peers, this.learners));
- }
+ nodeOptions.setElectionTimeoutMs(this.electionTimeoutMs);
+ nodeOptions.setEnableMetrics(enableMetrics);
+ nodeOptions.setSnapshotThrottle(snapshotThrottle);
+ nodeOptions.setSnapshotIntervalSecs(snapshotIntervalSecs);
+ nodeOptions.setServiceFactory(this.raftServiceFactory);
+ if (raftOptions != null) {
+ nodeOptions.setRaftOptions(raftOptions);
+ }
+ final String serverDataPath = this.dataPath + File.separator + listenAddr.toString().replace(':', '_');
+ new File(serverDataPath).mkdirs();
+ nodeOptions.setLogUri(serverDataPath + File.separator + "logs");
+ nodeOptions.setRaftMetaUri(serverDataPath + File.separator + "meta");
+ nodeOptions.setSnapshotUri(serverDataPath + File.separator + "snapshot");
+ nodeOptions.setElectionPriority(priority);
+
+ final MockStateMachine fsm = new MockStateMachine(listenAddr);
+ nodeOptions.setFsm(fsm);
+
+ if (!emptyPeers) {
+ nodeOptions.setInitialConf(new Configuration(this.peers, this.learners));
+ }
- List<String> servers = emptyPeers ? List.of() : this.peers.stream().map(p -> p.getEndpoint().toString()).collect(Collectors.toList());
+ List<String> servers = emptyPeers ? List.of() : this.peers.stream().map(p -> p.getEndpoint().toString()).collect(Collectors.toList());
- NodeManager nodeManager = new NodeManager();
+ NodeManager nodeManager = new NodeManager();
- final IgniteRpcServer rpcServer = new TestIgniteRpcServer(listenAddr, servers, nodeManager);
- nodeOptions.setRpcClient(new IgniteRpcClient(rpcServer.clusterService(), true));
+ ClusterService clusterService = createClusterService(listenAddr, servers);
- if (optsClo != null)
- optsClo.accept(nodeOptions);
+ var rpcClient = new IgniteRpcClient(clusterService);
- final RaftGroupService server = new RaftGroupService(this.name, new PeerId(listenAddr, 0, priority),
- nodeOptions, rpcServer, nodeManager);
+ nodeOptions.setRpcClient(rpcClient);
- this.lock.lock();
- try {
- if (this.serverMap.put(listenAddr.toString(), server) == null) {
- final Node node = server.start();
+ var rpcServer = new TestIgniteRpcServer(clusterService, servers, nodeManager);
- this.fsms.put(new PeerId(listenAddr, 0), fsm);
- this.nodes.add((NodeImpl) node);
- return true;
- }
+ clusterService.start();
+
+ if (optsClo != null)
+ optsClo.accept(nodeOptions);
+
+ final RaftGroupService server = new RaftGroupService(this.name, new PeerId(listenAddr, 0, priority),
+ nodeOptions, rpcServer, nodeManager) {
+ @Override public synchronized void shutdown() {
+ super.shutdown();
+
+ clusterService.shutdown();
+ }
+ };
+
+ this.serverMap.put(listenAddr, server);
+
+ final Node node = server.start();
+
+ this.fsms.put(new PeerId(listenAddr, 0), fsm);
+ this.nodes.add((NodeImpl) node);
+ return true;
}
finally {
this.lock.unlock();
}
- return false;
+ }
+
+ /**
+ * Creates a non-started {@link ClusterService}.
+ */
+ private static ClusterService createClusterService(Endpoint endpoint, List<String> members) {
+ var registry = new TestMessageSerializationRegistryImpl();
+
+ var clusterConfig = new ClusterLocalConfiguration(endpoint.toString(), endpoint.getPort(), members, registry);
+
+ var clusterServiceFactory = new TestScaleCubeClusterServiceFactory();
+
+ return clusterServiceFactory.createClusterService(clusterConfig);
}
public Node getNode(Endpoint endpoint) {
@@ -240,7 +268,7 @@ public class TestCluster {
}
public RaftGroupService getServer(Endpoint endpoint) {
- return serverMap.get(endpoint.toString());
+ return serverMap.get(endpoint);
}
public MockStateMachine getFsmByPeer(final PeerId peer) {
@@ -265,7 +293,7 @@ public class TestCluster {
public boolean stop(final Endpoint listenAddr) throws InterruptedException {
removeNode(listenAddr);
- final RaftGroupService raftGroupService = this.serverMap.remove(listenAddr.toString());
+ final RaftGroupService raftGroupService = this.serverMap.remove(listenAddr);
raftGroupService.shutdown();
return true;
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
index 01cd57f..9948db1 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
@@ -46,7 +46,7 @@ import static org.junit.jupiter.api.Assertions.fail;
public abstract class AbstractRpcTest {
protected Endpoint endpoint;
- private final List<RpcServer<?>> servers = new ArrayList<>();
+ private RpcServer<?> server;
private final List<RpcClient> clients = new ArrayList<>();
@@ -54,18 +54,18 @@ public abstract class AbstractRpcTest {
public void setup() {
endpoint = new Endpoint(TestUtils.getLocalAddress(), INIT_PORT);
- RpcServer<?> server = createServer(endpoint);
+ server = createServer(endpoint);
+
server.registerProcessor(new Request1RpcProcessor());
server.registerProcessor(new Request2RpcProcessor());
server.init(null);
-
- servers.add(server);
}
@AfterEach
public void tearDown() {
clients.forEach(RpcClient::shutdown);
- servers.forEach(RpcServer::shutdown);
+
+ server.shutdown();
}
/**
@@ -80,6 +80,8 @@ public abstract class AbstractRpcTest {
private RpcClient createClient() {
RpcClient client = createClient0();
+ client.init(null);
+
clients.add(client);
return client;
@@ -135,7 +137,7 @@ public abstract class AbstractRpcTest {
assertTrue(client1.checkConnection(endpoint));
assertTrue(client2.checkConnection(endpoint));
- servers.get(0).shutdown();
+ server.shutdown();
assertTrue(waitForTopology(client1, 2, 5_000));
assertTrue(waitForTopology(client2, 2, 5_000));
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
index 52f1ec8..d5370b9 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
@@ -23,7 +23,6 @@ import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
-import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.util.Endpoint;
@@ -32,17 +31,24 @@ import org.apache.ignite.raft.jraft.util.Endpoint;
*
*/
public class IgniteRpcTest extends AbstractRpcTest {
- /**
- * Serialization registry.
- */
- private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
-
/** The counter. */
private final AtomicInteger cntr = new AtomicInteger();
/** {@inheritDoc} */
@Override public RpcServer<?> createServer(Endpoint endpoint) {
- return new TestIgniteRpcServer(endpoint, new NodeManager());
+ ClusterService service = createService(endpoint.toString(), endpoint.getPort(), List.of());
+
+ var server = new TestIgniteRpcServer(service, List.of(), new NodeManager()) {
+ @Override public void shutdown() {
+ super.shutdown();
+
+ service.shutdown();
+ }
+ };
+
+ service.start();
+
+ return server;
}
/** {@inheritDoc} */
@@ -51,7 +57,15 @@ public class IgniteRpcTest extends AbstractRpcTest {
ClusterService service = createService("client" + i, endpoint.getPort() - i, List.of(endpoint.toString()));
- IgniteRpcClient client = new IgniteRpcClient(service, false);
+ IgniteRpcClient client = new IgniteRpcClient(service) {
+ @Override public void shutdown() {
+ super.shutdown();
+
+ service.shutdown();
+ }
+ };
+
+ service.start();
waitForTopology(client, 1 + i, 5_000);
@@ -64,8 +78,9 @@ public class IgniteRpcTest extends AbstractRpcTest {
* @param servers Server nodes of the cluster.
* @return The client cluster view.
*/
- protected ClusterService createService(String name, int port, List<String> servers) {
- var context = new ClusterLocalConfiguration(name, port, servers, SERIALIZATION_REGISTRY);
+ private static ClusterService createService(String name, int port, List<String> servers) {
+ var registry = new MessageSerializationRegistryImpl();
+ var context = new ClusterLocalConfiguration(name, port, servers, registry);
var factory = new TestScaleCubeClusterServiceFactory();
return factory.createClusterService(context);
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
index 1671155..9052f10 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
@@ -18,63 +18,28 @@
package org.apache.ignite.raft.jraft.rpc;
import java.util.List;
-import org.apache.ignite.network.ClusterLocalConfiguration;
-import org.apache.ignite.network.MessageSerializationRegistryImpl;
-import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
-import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
-import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.network.ClusterService;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
-import org.apache.ignite.raft.jraft.util.Endpoint;
/**
* RPC server configured for integration tests.
*/
public class TestIgniteRpcServer extends IgniteRpcServer {
/**
- * The registry.
+ * @param clusterService cluster service
+ * @param servers server list
+ * @param nodeManager node manager
*/
- /** */
- private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
-
- /**
- * Service factory.
- */
- private final static ScaleCubeClusterServiceFactory SERVICE_FACTORY = new TestScaleCubeClusterServiceFactory();
-
- /**
- * Message factory.
- */
- private static final RaftClientMessagesFactory MSG_FACTORY = new RaftClientMessagesFactory();
-
- /**
- * @param endpoint The endpoint.
- * @param nodeManager The node manager.
- */
- public TestIgniteRpcServer(Endpoint endpoint, NodeManager nodeManager) {
- this(endpoint.getIp() + ":" + endpoint.getPort(), endpoint.getPort(), List.of(), nodeManager);
- }
-
- /**
- * @param endpoint The endpoint.
- * @param servers Server list.
- * @param nodeManager The node manager.
- */
- public TestIgniteRpcServer(Endpoint endpoint, List<String> servers, NodeManager nodeManager) {
- this(endpoint.getIp() + ":" + endpoint.getPort(), endpoint.getPort(), servers, nodeManager);
- }
-
- /**
- * @param name The name.
- * @param port The port.
- * @param servers Server list.
- * @param nodeManager The node manager.
- */
- public TestIgniteRpcServer(String name, int port, List<String> servers, NodeManager nodeManager) {
- super(SERVICE_FACTORY.createClusterService(new ClusterLocalConfiguration(name, port, servers, SERIALIZATION_REGISTRY)),
- false, nodeManager, MSG_FACTORY, JRaftUtils.createRequestExecutor(new NodeOptions()));
+ public TestIgniteRpcServer(ClusterService clusterService, List<String> servers, NodeManager nodeManager) {
+ super(
+ clusterService,
+ nodeManager,
+ new RaftClientMessagesFactory(),
+ JRaftUtils.createRequestExecutor(new NodeOptions())
+ );
}
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index cab6b97..d6cec07 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -158,17 +158,13 @@ public class ITDistributedTableTest {
public void partitionListener() throws Exception {
String grpId = "part";
- RaftServer partSrv = new RaftServerImpl(
- cluster.get(0),
- FACTORY,
- true
- );
+ RaftServer partSrv = new RaftServerImpl(cluster.get(0), FACTORY);
List<Peer> conf = List.of(new Peer(cluster.get(0).topologyService().localMember().address()));
partSrv.startRaftGroup(grpId, new PartitionListener(), conf);
- RaftGroupService partRaftGrp = new RaftGroupServiceImpl(grpId, client, FACTORY, 10_000, conf, true, 200, true);
+ RaftGroupService partRaftGrp = new RaftGroupServiceImpl(grpId, client, FACTORY, 10_000, conf, true, 200);
Row testRow = getTestRow();
@@ -222,13 +218,8 @@ public class ITDistributedTableTest {
public void partitionedTable() {
HashMap<ClusterNode, RaftServer> raftServers = new HashMap<>(NODES);
- for (int i = 0; i < NODES; i++) {
- raftServers.put(cluster.get(i).topologyService().localMember(), new RaftServerImpl(
- cluster.get(i),
- FACTORY,
- true
- ));
- }
+ for (int i = 0; i < NODES; i++)
+ raftServers.put(cluster.get(i).topologyService().localMember(), new RaftServerImpl(cluster.get(i), FACTORY));
List<List<ClusterNode>> assignment = RendezvousAffinityFunction.assignPartitions(
cluster.stream().map(node -> node.topologyService().localMember()).collect(Collectors.toList()),
@@ -251,7 +242,7 @@ public class ITDistributedTableTest {
rs.startRaftGroup(grpId, new PartitionListener(), conf);
- partMap.put(p, new RaftGroupServiceImpl(grpId, client, FACTORY, 10_000, conf, true, 200, true));
+ partMap.put(p, new RaftGroupServiceImpl(grpId, client, FACTORY, 10_000, conf, true, 200));
p++;
}