You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/06/21 07:09:12 UTC

[ignite-3] branch main updated: IGNITE-14918 Externalize ClusterService lifecycle management - Fixes #183.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ccdb7d  IGNITE-14918 Externalize ClusterService lifecycle management - Fixes #183.
7ccdb7d is described below

commit 7ccdb7d344bcbdb56a074ef41f4e206ec4da79c3
Author: Aleksandr Polovtsev <al...@gmail.com>
AuthorDate: Mon Jun 21 10:05:01 2021 +0300

    IGNITE-14918 Externalize ClusterService lifecycle management - Fixes #183.
    
    Signed-off-by: Alexey Scherbakov <al...@gmail.com>
---
 .../client/ITMetaStorageServiceTest.java           |   9 +-
 .../scalecube/ScaleCubeClusterServiceFactory.java  |   4 +
 .../client/service/impl/RaftGroupServiceImpl.java  |  13 +-
 .../raft/client/service/RaftGroupServiceTest.java  |  26 +--
 .../ignite/raft/jraft/core/ITCliServiceTest.java   |  41 ++---
 .../apache/ignite/raft/jraft/core/ITNodeTest.java  | 180 +++++++++++----------
 .../raft/server/ITJRaftCounterServerTest.java      |  32 ++--
 .../raft/server/ITSimpleCounterServerTest.java     |  38 +++--
 .../java/org/apache/ignite/internal/raft/Loza.java |   5 +-
 .../internal/raft/server/impl/JRaftServerImpl.java |  17 +-
 .../internal/raft/server/impl/RaftServerImpl.java  |  17 +-
 .../raft/jraft/rpc/impl/IgniteRpcClient.java       |  17 +-
 .../raft/jraft/rpc/impl/IgniteRpcServer.java       | 110 ++++++-------
 .../apache/ignite/raft/jraft/core/TestCluster.java | 130 +++++++++------
 .../ignite/raft/jraft/rpc/AbstractRpcTest.java     |  14 +-
 .../ignite/raft/jraft/rpc/IgniteRpcTest.java       |  35 ++--
 .../ignite/raft/jraft/rpc/TestIgniteRpcServer.java |  57 ++-----
 .../ignite/distributed/ITDistributedTableTest.java |  19 +--
 18 files changed, 374 insertions(+), 390 deletions(-)

diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
index b7fc033..aae7d25 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
@@ -1013,11 +1013,7 @@ public class ITMetaStorageServiceTest {
     private MetaStorageService prepareMetaStorage(KeyValueStorage keyValStorageMock) {
         List<Peer> peers = List.of(new Peer(cluster.get(0).topologyService().localMember().address()));
 
-        metaStorageRaftSrv = new RaftServerImpl(
-            cluster.get(0),
-            FACTORY,
-            true
-        );
+        metaStorageRaftSrv = new RaftServerImpl(cluster.get(0), FACTORY);
 
         metaStorageRaftSrv.
             startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(keyValStorageMock), peers);
@@ -1029,8 +1025,7 @@ public class ITMetaStorageServiceTest {
             10_000,
             peers,
             true,
-            200,
-            true
+            200
         );
 
         return new MetaStorageServiceImpl(metaStorageRaftGrpSvc);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 154f825..d7d2685 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -103,6 +103,10 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
 
             /** {@inheritDoc} */
             @Override public void shutdown() {
+                // local member will be null, if cluster has not been started
+                if (cluster.member() == null)
+                    return;
+
                 stopJmxMonitor();
 
                 cluster.shutdown();
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
index e4a938c..0d04eec 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
@@ -90,9 +90,6 @@ public class RaftGroupServiceImpl implements RaftGroupService {
     /** */
     private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
 
-    /** */
-    private final boolean reuse;
-
     /**
      * @param groupId Group id.
      * @param cluster A cluster.
@@ -101,7 +98,6 @@ public class RaftGroupServiceImpl implements RaftGroupService {
      * @param peers Initial group configuration.
      * @param refreshLeader {@code True} to synchronously refresh leader on service creation.
      * @param retryDelay Retry delay.
-     * @param reuse {@code True} to reuse cluster service (avoid lifecycle management).
      */
     public RaftGroupServiceImpl(
         String groupId,
@@ -110,8 +106,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
         int timeout,
         List<Peer> peers,
         boolean refreshLeader,
-        long retryDelay,
-        boolean reuse
+        long retryDelay
     ) {
         this.cluster = requireNonNull(cluster);
         this.peers = requireNonNull(peers);
@@ -119,10 +114,6 @@ public class RaftGroupServiceImpl implements RaftGroupService {
         this.timeout = timeout;
         this.groupId = groupId;
         this.retryDelay = retryDelay;
-        this.reuse = reuse;
-
-        if (!reuse)
-            cluster.start();
 
         if (refreshLeader) {
             try {
@@ -342,8 +333,6 @@ public class RaftGroupServiceImpl implements RaftGroupService {
 
     /** {@inheritDoc} */
     @Override public void shutdown() {
-        if (!reuse)
-            cluster.shutdown();
     }
 
     /**
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
index de76740..c31687e 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
@@ -118,7 +118,7 @@ public class RaftGroupServiceTest {
         mockLeaderRequest(false);
 
         RaftGroupService service =
-            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
 
         assertNull(service.leader());
 
@@ -140,7 +140,7 @@ public class RaftGroupServiceTest {
         leader = null;
 
         RaftGroupService service =
-            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
 
         assertNull(service.leader());
 
@@ -175,7 +175,7 @@ public class RaftGroupServiceTest {
         }, 500);
 
         RaftGroupService service =
-            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
 
         assertNull(service.leader());
 
@@ -194,7 +194,7 @@ public class RaftGroupServiceTest {
         mockLeaderRequest(true);
 
         RaftGroupService service =
-            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
 
         try {
             service.refreshLeader().get(500, TimeUnit.MILLISECONDS);
@@ -217,7 +217,7 @@ public class RaftGroupServiceTest {
         mockUserInput(false, null);
 
         RaftGroupService service =
-            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
 
         service.refreshLeader().get();
 
@@ -237,7 +237,7 @@ public class RaftGroupServiceTest {
         mockUserInput(false, null);
 
         RaftGroupService service =
-            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
 
         assertNull(service.leader());
 
@@ -259,7 +259,7 @@ public class RaftGroupServiceTest {
         mockUserInput(true, null);
 
         RaftGroupService service =
-            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
 
         try {
             service.run(new TestCommand()).get(500, TimeUnit.MILLISECONDS);
@@ -282,7 +282,7 @@ public class RaftGroupServiceTest {
         mockUserInput(false, null);
 
         RaftGroupService service =
-            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, true);
+            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY);
 
         Peer leader = this.leader;
 
@@ -313,7 +313,7 @@ public class RaftGroupServiceTest {
         mockUserInput(false, null);
 
         RaftGroupService service =
-            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, true);
+            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY);
 
         Peer leader = this.leader;
 
@@ -349,7 +349,7 @@ public class RaftGroupServiceTest {
         mockUserInput(false, NODES.get(0));
 
         RaftGroupService service =
-            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT * 3, NODES, true, DELAY, true);
+            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT * 3, NODES, true, DELAY);
 
         Peer leader = this.leader;
 
@@ -387,7 +387,7 @@ public class RaftGroupServiceTest {
         mockUserInput(false, null);
 
         RaftGroupService service =
-            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, true);
+            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY);
 
         Peer leader = this.leader;
 
@@ -418,7 +418,7 @@ public class RaftGroupServiceTest {
         mockSnapshotRequest(1);
 
         RaftGroupService service =
-            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
 
         CompletableFuture<Void> fut = service.snapshot(new Peer("localhost:8082"));
 
@@ -442,7 +442,7 @@ public class RaftGroupServiceTest {
         mockSnapshotRequest(0);
 
         RaftGroupService service =
-            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, true);
+            new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
 
         CompletableFuture<Void> fut = service.snapshot(new Peer("localhost:8082"));
 
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
index 052f35d..05b8a56 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
@@ -34,9 +34,7 @@ import java.util.stream.Collectors;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessageSerializationRegistryImpl;
-import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
 import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
-import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.raft.jraft.CliService;
 import org.apache.ignite.raft.jraft.JRaftUtils;
 import org.apache.ignite.raft.jraft.Node;
@@ -70,29 +68,21 @@ public class ITCliServiceTest {
     /**
      * The logger.
      */
-    static final Logger LOG = LoggerFactory.getLogger(ITCliServiceTest.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ITCliServiceTest.class);
 
-    /**
-     * The registry.
-     */
-    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
-
-    /**
-     * The message factory.
-     */
-    private final static ScaleCubeClusterServiceFactory factory = new TestScaleCubeClusterServiceFactory();
+    private static final int LEARNER_PORT_STEP = 100;
 
     private String dataPath;
 
     private TestCluster cluster;
+
     private final String groupId = "CliServiceTest";
 
     private CliService cliService;
 
     private Configuration conf;
 
-    private static final int LEARNER_PORT_STEP = 100;
-
+    /** */
     @BeforeEach
     public void setup(TestInfo testInfo) throws Exception {
         LOG.info(">>>>>>>>>>>>>>> Start test method: " + testInfo.getDisplayName());
@@ -130,11 +120,26 @@ public class ITCliServiceTest {
         CliOptions opts = new CliOptions();
         opts.setClientExecutor(JRaftUtils.createClientExecutor(opts, "client"));
 
-        ClusterService clientSvc = factory.createClusterService(new ClusterLocalConfiguration("client",
-            TestUtils.INIT_PORT - 1, peers.stream().map(p -> p.getEndpoint().toString()).collect(Collectors.toList()),
-            SERIALIZATION_REGISTRY));
+        List<String> memberAddresses = peers.stream().map(p -> p.getEndpoint().toString()).collect(Collectors.toList());
+
+        var registry = new MessageSerializationRegistryImpl();
+
+        var serviceConfig = new ClusterLocalConfiguration("client", TestUtils.INIT_PORT - 1, memberAddresses, registry);
+
+        var factory = new TestScaleCubeClusterServiceFactory();
+
+        ClusterService clientSvc = factory.createClusterService(serviceConfig);
+
+        clientSvc.start();
+
+        IgniteRpcClient rpcClient = new IgniteRpcClient(clientSvc) {
+            @Override public void shutdown() {
+                super.shutdown();
+
+                clientSvc.shutdown();
+            }
+        };
 
-        IgniteRpcClient rpcClient = new IgniteRpcClient(clientSvc, false);
         opts.setRpcClient(rpcClient);
         assertTrue(cliService.init(opts));
     }
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
index 80c54e6..159bba9 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
@@ -35,10 +35,14 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.BiPredicate;
 import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import com.codahale.metrics.ConsoleReporter;
+import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
 import org.apache.ignite.raft.jraft.Iterator;
 import org.apache.ignite.raft.jraft.JRaftUtils;
 import org.apache.ignite.raft.jraft.Node;
@@ -101,20 +105,11 @@ import static org.junit.jupiter.api.Assertions.fail;
  * Integration tests for raft cluster.
  */
 public class ITNodeTest {
-    static final Logger LOG = LoggerFactory.getLogger(ITNodeTest.class);
-
-    private String dataPath;
-
-    private final AtomicInteger startedCounter = new AtomicInteger(0);
-    private final AtomicInteger stoppedCounter = new AtomicInteger(0);
-
-    private long testStartMs;
+    private static final Logger LOG = LoggerFactory.getLogger(ITNodeTest.class);
 
     private static DumpThread dumpThread;
 
-    private TestCluster cluster;
-
-    static class DumpThread extends Thread {
+    private static class DumpThread extends Thread {
         private static long DUMP_TIMEOUT_MS = 5 * 60 * 1000;
         private volatile boolean stopped = false;
 
@@ -135,6 +130,18 @@ public class ITNodeTest {
         }
     }
 
+    private String dataPath;
+
+    private final AtomicInteger startedCounter = new AtomicInteger(0);
+
+    private final AtomicInteger stoppedCounter = new AtomicInteger(0);
+
+    private long testStartMs;
+
+    private TestCluster cluster;
+
+    private final List<RaftGroupService> services = new ArrayList<>();
+
     @BeforeAll
     public static void setupNodeTest() {
         dumpThread = new DumpThread();
@@ -167,6 +174,14 @@ public class ITNodeTest {
 
     @AfterEach
     public void teardown(TestInfo testInfo) throws Exception {
+        services.forEach(service -> {
+            try {
+                service.shutdown();
+            } catch (Exception e) {
+                LOG.error("Error while closing a service", e);
+            }
+        });
+
         if (cluster != null)
             cluster.stopAll();
 
@@ -190,8 +205,6 @@ public class ITNodeTest {
         RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions);
 
         service.start(true);
-
-        service.shutdown();
     }
 
     @Test
@@ -235,13 +248,9 @@ public class ITNodeTest {
             node.apply(task);
             tasks.add(task);
         }
-        try {
-            Task.joinAll(tasks, TimeUnit.SECONDS.toMillis(30));
-            assertEquals(10, c.get());
-        }
-        finally {
-            service.shutdown();
-        }
+
+        Task.joinAll(tasks, TimeUnit.SECONDS.toMillis(30));
+        assertEquals(10, c.get());
     }
 
     /**
@@ -349,8 +358,6 @@ public class ITNodeTest {
         // No read-index request succeed.
         assertEquals(0, readIndexSuccesses.get());
         assertTrue(n - 1 >= currentValue.get());
-
-        service.shutdown();
     }
 
     @Test
@@ -380,7 +387,6 @@ public class ITNodeTest {
         int i = 0;
         for (ByteBuffer data : fsm.getLogs())
             assertEquals("hello" + i++, new String(data.array()));
-        service.shutdown();
     }
 
     @Test
@@ -1520,16 +1526,14 @@ public class ITNodeTest {
             NodeImpl follower0 = (NodeImpl) follower;
             DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService();
             RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient();
-            rpcClientEx.blockMessages(new BiPredicate<>() {
-                @Override public boolean test(Object msg, String nodeId) {
-                    if (msg instanceof RpcRequests.RequestVoteRequest) {
-                        RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest)msg;
-
-                        return !msg0.getPreVote();
-                    }
+            rpcClientEx.blockMessages((msg, nodeId) -> {
+                if (msg instanceof RpcRequests.RequestVoteRequest) {
+                    RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest)msg;
 
-                    return false;
+                    return !msg0.getPreVote();
                 }
+
+                return false;
             });
         }
 
@@ -2273,8 +2277,6 @@ public class ITNodeTest {
         node.snapshot(new ExpectClosure(RaftError.EINVAL, "Snapshot is not supported", latch));
         waitLatch(latch);
         assertEquals(0, fsm.getSaveSnapshotTimes());
-
-        service.shutdown();
     }
 
     @Test
@@ -2302,8 +2304,6 @@ public class ITNodeTest {
         int times = fsm.getSaveSnapshotTimes();
         assertTrue(times >= 1, "snapshotTimes=" + times);
         assertTrue(fsm.getSnapshotIndex() > 0);
-
-        service.shutdown();
     }
 
     @Test
@@ -2551,10 +2551,6 @@ public class ITNodeTest {
             catch (Exception e) {
                 // Expected.
             }
-            finally {
-                service.shutdown();
-            }
-
         }
     }
 
@@ -2906,22 +2902,18 @@ public class ITNodeTest {
         nodeOpts.setFsm(fsm);
 
         RaftGroupService service = createService("test", new PeerId(addr, 0), nodeOpts);
-        try {
-            Node node = service.start(true);
-            assertEquals(26, fsm.getLogs().size());
 
-            for (int i = 0; i < 26; i++)
-                assertEquals('a' + i, fsm.getLogs().get(i).get());
+        Node node = service.start(true);
+        assertEquals(26, fsm.getLogs().size());
 
-            // Group configuration will be restored from snapshot meta.
-            while (!node.isLeader())
-                Thread.sleep(20);
-            sendTestTaskAndWait(node);
-            assertEquals(36, fsm.getLogs().size());
-        }
-        finally {
-            service.shutdown();
-        }
+        for (int i = 0; i < 26; i++)
+            assertEquals('a' + i, fsm.getLogs().get(i).get());
+
+        // Group configuration will be restored from snapshot meta.
+        while (!node.isLeader())
+            Thread.sleep(20);
+        sendTestTaskAndWait(node);
+        assertEquals(36, fsm.getLogs().size());
     }
 
     @Test
@@ -2948,16 +2940,12 @@ public class ITNodeTest {
         nodeOpts.setFsm(fsm);
 
         RaftGroupService service = createService("test", new PeerId(addr, 0), nodeOpts);
-        try {
-            Node node = service.start(true);
-            while (!node.isLeader())
-                Thread.sleep(20);
-            sendTestTaskAndWait(node);
-            assertEquals(10, fsm.getLogs().size());
-        }
-        finally {
-            service.shutdown();
-        }
+
+        Node node = service.start(true);
+        while (!node.isLeader())
+            Thread.sleep(20);
+        sendTestTaskAndWait(node);
+        assertEquals(10, fsm.getLogs().size());
     }
 
     @Test
@@ -3344,16 +3332,14 @@ public class ITNodeTest {
             NodeImpl follower0 = (NodeImpl) follower;
             DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService();
             RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient();
-            rpcClientEx.blockMessages(new BiPredicate<>() {
-                @Override public boolean test(Object msg, String nodeId) {
-                    if (msg instanceof RpcRequests.RequestVoteRequest) {
-                        RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest)msg;
-
-                        return !msg0.getPreVote();
-                    }
+            rpcClientEx.blockMessages((msg, nodeId) -> {
+                if (msg instanceof RpcRequests.RequestVoteRequest) {
+                    RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest)msg;
 
-                    return false;
+                    return !msg0.getPreVote();
                 }
+
+                return false;
             });
         }
 
@@ -3453,23 +3439,49 @@ public class ITNodeTest {
      * @return Raft group service.
      */
     private RaftGroupService createService(String groupId, PeerId peerId, NodeOptions nodeOptions) {
-        NodeManager nodeManager = new NodeManager();
-
-        List<String> servers = new ArrayList<>();
-
         Configuration initialConf = nodeOptions.getInitialConf();
 
-        if (initialConf != null) {
-            for (PeerId id : initialConf.getPeers())
-                servers.add(id.getEndpoint().toString());
+        var servers = List.<String>of();
 
-            for (PeerId id : initialConf.getLearners())
-                servers.add(id.getEndpoint().toString());
+        if (initialConf != null) {
+            servers = Stream.concat(initialConf.getPeers().stream(), initialConf.getLearners().stream())
+                .map(id -> id.getEndpoint().toString())
+                .collect(Collectors.toList());
         }
 
-        IgniteRpcServer rpcServer = new TestIgniteRpcServer(peerId.getEndpoint(), servers, nodeManager);
-        nodeOptions.setRpcClient(new IgniteRpcClient(rpcServer.clusterService(), true));
+        var nodeManager = new NodeManager();
+
+        ClusterService clusterService = createClusterService(peerId.getEndpoint(), servers);
+
+        IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, servers, nodeManager);
+
+        nodeOptions.setRpcClient(new IgniteRpcClient(clusterService));
+
+        clusterService.start();
+
+        var service = new RaftGroupService(groupId, peerId, nodeOptions, rpcServer, nodeManager) {
+            @Override public synchronized void shutdown() {
+                super.shutdown();
+
+                clusterService.shutdown();
+            }
+        };
+
+        services.add(service);
+
+        return service;
+    }
+
+    /**
+     * Creates a non-started {@link ClusterService}.
+     */
+    private static ClusterService createClusterService(Endpoint endpoint, List<String> members) {
+        var registry = new TestMessageSerializationRegistryImpl();
+
+        var clusterConfig = new ClusterLocalConfiguration(endpoint.toString(), endpoint.getPort(), members, registry);
+
+        var clusterServiceFactory = new TestScaleCubeClusterServiceFactory();
 
-        return new RaftGroupService(groupId, peerId, nodeOptions, rpcServer, nodeManager);
+        return clusterServiceFactory.createClusterService(clusterConfig);
     }
 }
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
index 06c949b..c07a815 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
@@ -102,12 +102,12 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
     /**
      * Servers list.
      */
-    protected List<JRaftServerImpl> servers = new ArrayList<>();
+    protected final List<JRaftServerImpl> servers = new ArrayList<>();
 
     /**
      * Clients list.
      */
-    protected List<RaftGroupService> clients = new ArrayList<>();
+    private final List<RaftGroupService> clients = new ArrayList<>();
 
     /**
      * Data path.
@@ -146,9 +146,15 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
      */
     private JRaftServerImpl startServer(int idx, Consumer<RaftServer> clo) {
         ClusterService service = clusterService("server" + idx, PORT + idx,
-            List.of(getLocalAddress() + ":" + PORT), false);
+            List.of(getLocalAddress() + ":" + PORT), true);
 
-        JRaftServerImpl server = new JRaftServerImpl(service, dataPath, FACTORY, false);
+        JRaftServerImpl server = new JRaftServerImpl(service, dataPath, FACTORY) {
+            @Override public void shutdown() throws Exception {
+                super.shutdown();
+
+                service.shutdown();
+            }
+        };
 
         clo.accept(server);
 
@@ -166,11 +172,17 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
     private RaftGroupService startClient(String groupId) {
         String addr = getLocalAddress() + ":" + PORT;
 
-        ClusterService clientNode1 = clusterService("client_" + groupId + "_", CLIENT_PORT + clients.size(),
-            List.of(addr), false);
+        ClusterService clientNode = clusterService("client_" + groupId + "_", CLIENT_PORT + clients.size(),
+            List.of(addr), true);
 
-        RaftGroupServiceImpl client = new RaftGroupServiceImpl(groupId, clientNode1, FACTORY, 10_000,
-            List.of(new Peer(addr)), false, 200, false);
+        RaftGroupServiceImpl client = new RaftGroupServiceImpl(groupId, clientNode, FACTORY, 10_000,
+            List.of(new Peer(addr)), false, 200) {
+            @Override public void shutdown() {
+                super.shutdown();
+
+                clientNode.shutdown();
+            }
+        };
 
         clients.add(client);
 
@@ -345,7 +357,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
     public void testApplyWithFailure() throws Exception {
         listenerFactory = () -> new CounterListener() {
             @Override public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
-                Iterator<CommandClosure<WriteCommand>> wrapper = new Iterator<CommandClosure<WriteCommand>>() {
+                Iterator<CommandClosure<WriteCommand>> wrapper = new Iterator<>() {
                     @Override public boolean hasNext() {
                         return iterator.hasNext();
                     }
@@ -353,7 +365,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
                     @Override public CommandClosure<WriteCommand> next() {
                         CommandClosure<WriteCommand> cmd = iterator.next();
 
-                        IncrementAndGetCommand command = (IncrementAndGetCommand) cmd.command();
+                        IncrementAndGetCommand command = (IncrementAndGetCommand)cmd.command();
 
                         if (command.delta() == 10)
                             throw new IgniteInternalException("Very bad");
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITSimpleCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITSimpleCounterServerTest.java
index 6296836..efb701c 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITSimpleCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITSimpleCounterServerTest.java
@@ -19,12 +19,12 @@ package org.apache.ignite.raft.server;
 
 import java.util.List;
 import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
-import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -74,24 +74,42 @@ class ITSimpleCounterServerTest extends RaftServerAbstractTest {
 
         String id = "localhost:" + PORT;
 
-        ClusterService service = clusterService(id, PORT, List.of(), false);
+        ClusterService service = clusterService(id, PORT, List.of(), true);
+
+        server = new RaftServerImpl(service, FACTORY) {
+            @Override public synchronized void shutdown() throws Exception {
+                super.shutdown();
 
-        server = new RaftServerImpl(service, FACTORY, false);
+                service.shutdown();
+            }
+        };
 
-        ClusterNode serverNode = this.server.clusterService().topologyService().localMember();
+        ClusterNode serverNode = server.clusterService().topologyService().localMember();
 
-        this.server.startRaftGroup(COUNTER_GROUP_ID_0, new CounterListener(), List.of(new Peer(serverNode.address())));
-        this.server.startRaftGroup(COUNTER_GROUP_ID_1, new CounterListener(), List.of(new Peer(serverNode.address())));
+        server.startRaftGroup(COUNTER_GROUP_ID_0, new CounterListener(), List.of(new Peer(serverNode.address())));
+        server.startRaftGroup(COUNTER_GROUP_ID_1, new CounterListener(), List.of(new Peer(serverNode.address())));
 
-        ClusterService clientNode1 = clusterService("localhost:" + (PORT + 1), PORT + 1, List.of(id), false);
+        ClusterService clientNode1 = clusterService("localhost:" + (PORT + 1), PORT + 1, List.of(id), true);
 
         client1 = new RaftGroupServiceImpl(COUNTER_GROUP_ID_0, clientNode1, FACTORY, 1000,
-            List.of(new Peer(serverNode.address())), false, 200, false);
+            List.of(new Peer(serverNode.address())), false, 200) {
+            @Override public void shutdown() {
+                super.shutdown();
 
-        ClusterService clientNode2 = clusterService("localhost:" + (PORT + 2), PORT + 2, List.of(id), false);
+                clientNode1.shutdown();
+            }
+        };
+
+        ClusterService clientNode2 = clusterService("localhost:" + (PORT + 2), PORT + 2, List.of(id), true);
 
         client2 = new RaftGroupServiceImpl(COUNTER_GROUP_ID_1, clientNode2, FACTORY, 1000,
-            List.of(new Peer(serverNode.address())), false, 200, false);
+            List.of(new Peer(serverNode.address())), false, 200) {
+            @Override public void shutdown() {
+                super.shutdown();
+
+                clientNode2.shutdown();
+            }
+        };
 
         assertTrue(waitForTopology(service, 2, 1000));
         assertTrue(waitForTopology(clientNode1, 2, 1000));
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index c6dc324..bfa8066 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -56,7 +56,7 @@ public class Loza {
     public Loza(ClusterService clusterNetSvc) {
         this.clusterNetSvc = clusterNetSvc;
 
-        this.raftServer = new RaftServerImpl(clusterNetSvc, FACTORY, true);
+        this.raftServer = new RaftServerImpl(clusterNetSvc, FACTORY);
     }
 
     /**
@@ -82,8 +82,7 @@ public class Loza {
             TIMEOUT,
             peers.stream().map(i -> new Peer(i.address())).collect(Collectors.toList()),
             true,
-            DELAY,
-            true
+            DELAY
         );
     }
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
index 1d43aed..e29af9b 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
@@ -78,24 +78,21 @@ public class JRaftServerImpl implements RaftServer {
      * @param service Cluster service.
      * @param dataPath Data path.
      * @param factory The factory.
-     * @param reuse {@code True} to reuse cluster service (do not manage lifecyle)
      */
-    public JRaftServerImpl(ClusterService service, String dataPath, RaftClientMessagesFactory factory, boolean reuse) {
-        this(service, dataPath, factory, reuse, new NodeOptions());
+    public JRaftServerImpl(ClusterService service, String dataPath, RaftClientMessagesFactory factory) {
+        this(service, dataPath, factory, new NodeOptions());
     }
 
     /**
      * @param service Cluster service.
      * @param dataPath Data path.
      * @param factory The factory.
-     * @param reuse {@code True} to reuse cluster service (do not manage lifecyle)
      * @param opts Default node options.
      */
     public JRaftServerImpl(
         ClusterService service,
         String dataPath,
         RaftClientMessagesFactory factory,
-        boolean reuse,
         NodeOptions opts
     ) {
         this.service = service;
@@ -103,7 +100,7 @@ public class JRaftServerImpl implements RaftServer {
         this.nodeManager = new NodeManager();
         this.opts = opts;
 
-        assert !reuse || service.topologyService().localMember() != null;
+        assert service.topologyService().localMember() != null;
 
         if (opts.getServerName() == null)
             opts.setServerName(service.localConfiguration().getName());
@@ -120,7 +117,7 @@ public class JRaftServerImpl implements RaftServer {
         if (opts.getClientExecutor() == null)
             opts.setClientExecutor(JRaftUtils.createClientExecutor(opts, opts.getServerName()));
 
-        rpcServer = new IgniteRpcServer(service, reuse, nodeManager, factory, JRaftUtils.createRequestExecutor(opts));
+        rpcServer = new IgniteRpcServer(service, nodeManager, factory, JRaftUtils.createRequestExecutor(opts));
 
         rpcServer.init(null);
     }
@@ -164,14 +161,12 @@ public class JRaftServerImpl implements RaftServer {
         nodeOptions.setFsm(new DelegatingStateMachine(lsnr));
 
         if (initialConf != null) {
-            List<PeerId> mapped = initialConf.stream().map(p -> {
-                return PeerId.fromPeer(p);
-            }).collect(Collectors.toList());
+            List<PeerId> mapped = initialConf.stream().map(PeerId::fromPeer).collect(Collectors.toList());
 
             nodeOptions.setInitialConf(new Configuration(mapped, null));
         }
 
-        IgniteRpcClient client = new IgniteRpcClient(service, true);
+        IgniteRpcClient client = new IgniteRpcClient(service);
 
         nodeOptions.setRpcClient(client);
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
index 1ac9972..1139159 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
@@ -76,25 +76,16 @@ public class RaftServerImpl implements RaftServer {
     /** */
     private final Thread writeWorker;
 
-    /** */
-    private final boolean reuse;
-
     /**
      * @param service Network service.
      * @param clientMsgFactory Client message factory.
-     * @param reuse {@code True} to reuse cluster service.
      */
-    public RaftServerImpl(
-        ClusterService service,
-        RaftClientMessagesFactory clientMsgFactory,
-        boolean reuse
-    ) {
+    public RaftServerImpl(ClusterService service, RaftClientMessagesFactory clientMsgFactory) {
         Objects.requireNonNull(service);
         Objects.requireNonNull(clientMsgFactory);
 
         this.service = service;
         this.clientMsgFactory = clientMsgFactory;
-        this.reuse = reuse;
 
         readQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
         writeQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
@@ -124,9 +115,6 @@ public class RaftServerImpl implements RaftServer {
             // TODO https://issues.apache.org/jira/browse/IGNITE-14775
         });
 
-        if (!reuse)
-            service.start();
-
         readWorker = new Thread(() -> processQueue(readQueue, RaftGroupListener::onRead), "read-cmd-worker#" + service.topologyService().localMember().toString());
         readWorker.setDaemon(true);
         readWorker.start();
@@ -172,9 +160,6 @@ public class RaftServerImpl implements RaftServer {
         writeWorker.interrupt();
         writeWorker.join();
 
-        if (!reuse)
-            service.shutdown();
-
         LOG.info("Stopped replication server [node=" + service.toString() + ']');
     }
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
index f0d0495..6d23508 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import java.util.function.BiPredicate;
-import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.TopologyEventHandler;
@@ -47,18 +46,11 @@ public class IgniteRpcClient implements RpcClientEx {
 
     private final ClusterService service;
 
-    private final boolean reuse;
-
     /**
      * @param service The service.
-     * @param reuse {@code True} to reuse already started service.
      */
-    public IgniteRpcClient(ClusterService service, boolean reuse) {
+    public IgniteRpcClient(ClusterService service) {
         this.service = service;
-        this.reuse = reuse;
-
-        if (!reuse) // TODO asch use init
-            service.start();
     }
 
     public ClusterService clusterService() {
@@ -181,13 +173,6 @@ public class IgniteRpcClient implements RpcClientEx {
 
     /** {@inheritDoc} */
     @Override public void shutdown() {
-        try {
-            if (!reuse)
-                service.shutdown();
-        }
-        catch (Exception e) {
-            throw new IgniteInternalException(e);
-        }
     }
 
     /** {@inheritDoc} */
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index e5fc4e3..31404a1 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -21,11 +21,9 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
-import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.NetworkMessageHandler;
 import org.apache.ignite.network.TopologyEventHandler;
 import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
 import org.apache.ignite.raft.jraft.NodeManager;
@@ -59,30 +57,24 @@ public class IgniteRpcServer implements RpcServer<Void> {
     /** Factory. */
     private static final RaftClientMessagesFactory FACTORY = new RaftClientMessagesFactory();
 
-    /** The {@code true} to reuse cluster service. */
-    private final boolean reuse;
-
     private final ClusterService service;
 
-    private List<ConnectionClosedEventListener> listeners = new CopyOnWriteArrayList<>();
+    private final List<ConnectionClosedEventListener> listeners = new CopyOnWriteArrayList<>();
 
-    private Map<String, RpcProcessor> processors = new ConcurrentHashMap<>();
+    private final Map<String, RpcProcessor> processors = new ConcurrentHashMap<>();
 
     /**
      * @param service The cluster service.
-     * @param reuse {@code True} to reuse service (do no manage lifecycle).
      * @param nodeManager The node manager.
      * @param factory Message factory.
      * @param rpcExecutor The executor for RPC requests.
      */
     public IgniteRpcServer(
         ClusterService service,
-        boolean reuse,
         NodeManager nodeManager,
         RaftClientMessagesFactory factory,
         @Nullable Executor rpcExecutor
     ) {
-        this.reuse = reuse;
         this.service = service;
 
         // raft server RPC
@@ -112,60 +104,59 @@ public class IgniteRpcServer implements RpcServer<Void> {
         registerProcessor(new ActionRequestProcessor(rpcExecutor, FACTORY));
         registerProcessor(new org.apache.ignite.raft.jraft.rpc.impl.client.SnapshotRequestProcessor(rpcExecutor, FACTORY));
 
-        service.messagingService().addMessageHandler(new NetworkMessageHandler() {
-            @Override public void onReceived(NetworkMessage msg, ClusterNode sender, String corellationId) {
-                Class<? extends NetworkMessage> cls = msg.getClass();
-                RpcProcessor prc = processors.get(cls.getName());
+        service.messagingService().addMessageHandler((msg, sender, corellationId) -> {
+            Class<? extends NetworkMessage> cls = msg.getClass();
+            RpcProcessor<NetworkMessage> prc = processors.get(cls.getName());
 
-                // TODO asch cache mapping https://issues.apache.org/jira/browse/IGNITE-14832
-                if (prc == null) {
-                    for (Class<?> iface : cls.getInterfaces()) {
-                        prc = processors.get(iface.getName());
+            // TODO asch cache mapping https://issues.apache.org/jira/browse/IGNITE-14832
+            if (prc == null) {
+                for (Class<?> iface : cls.getInterfaces()) {
+                    prc = processors.get(iface.getName());
 
-                        if (prc != null)
-                            break;
-                    }
+                    if (prc != null)
+                        break;
                 }
+            }
 
-                if (prc == null)
-                    return;
+            if (prc == null)
+                return;
 
-                RpcProcessor.ExecutorSelector selector = prc.executorSelector();
+            RpcProcessor.ExecutorSelector selector = prc.executorSelector();
 
-                Executor executor = null;
+            Executor executor = null;
 
-                if (selector != null) {
-                    executor = selector.select(prc.getClass().getName(), msg, nodeManager);
-                }
+            if (selector != null)
+                executor = selector.select(prc.getClass().getName(), msg, nodeManager);
 
-                if (executor == null)
-                    executor = prc.executor();
+            if (executor == null)
+                executor = prc.executor();
 
-                if (executor == null)
-                    executor = rpcExecutor;
+            if (executor == null)
+                executor = rpcExecutor;
 
-                RpcProcessor finalPrc = prc;
+            RpcProcessor<NetworkMessage> finalPrc = prc;
 
-                executor.execute(() -> {
-                    finalPrc.handleRequest(new RpcContext() {
-                        @Override public NodeManager getNodeManager() {
-                            return nodeManager;
-                        }
+            executor.execute(() -> {
+                var context = new RpcContext() {
+                    @Override public NodeManager getNodeManager() {
+                        return nodeManager;
+                    }
 
-                        @Override public void sendResponse(Object responseObj) {
-                            service.messagingService().send(sender, (NetworkMessage) responseObj, corellationId);
-                        }
+                    @Override public void sendResponse(Object responseObj) {
+                        service.messagingService().send(sender, (NetworkMessage) responseObj, corellationId);
+                    }
 
-                        @Override public String getRemoteAddress() {
-                            return sender.address();
-                        }
+                    @Override public String getRemoteAddress() {
+                        return sender.address();
+                    }
 
-                        @Override public String getLocalAddress() {
-                            return service.topologyService().localMember().address();
-                        }
-                    }, msg);
-                });
-            }
+                    @Override public String getLocalAddress() {
+                        return service.topologyService().localMember().address();
+                    }
+                };
+
+                finalPrc.handleRequest(context, msg);
+            });
         });
 
         service.topologyService().addEventHandler(new TopologyEventHandler() {
@@ -180,23 +171,24 @@ public class IgniteRpcServer implements RpcServer<Void> {
         });
     }
 
+    /** {@inheritDoc} */
     @Override public void registerConnectionClosedEventListener(ConnectionClosedEventListener listener) {
-        if (!listeners.contains(listeners))
+        if (!listeners.contains(listener))
             listeners.add(listener);
     }
 
+    /** {@inheritDoc} */
     @Override public void registerProcessor(RpcProcessor<?> processor) {
         processors.put(processor.interest(), processor);
     }
 
+    /** {@inheritDoc} */
     @Override public int boundPort() {
         return 0;
     }
 
+    /** {@inheritDoc} */
     @Override public boolean init(Void opts) {
-        if (!reuse)
-            service.start();
-
         return true;
     }
 
@@ -204,15 +196,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
         return service;
     }
 
+    /** {@inheritDoc} */
     @Override public void shutdown() {
-        if (reuse)
-            return;
-
-        try {
-            service.shutdown();
-        }
-        catch (Exception e) {
-            throw new IgniteInternalException(e);
-        }
     }
 }
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 94a2efe..92fe82a 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -30,6 +30,10 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
 import org.apache.ignite.raft.jraft.JRaftServiceFactory;
 import org.apache.ignite.raft.jraft.JRaftUtils;
 import org.apache.ignite.raft.jraft.Node;
@@ -41,7 +45,6 @@ import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.rpc.TestIgniteRpcServer;
 import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
-import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
 import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
 import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.raft.jraft.util.Endpoint;
@@ -64,7 +67,7 @@ public class TestCluster {
     private final List<PeerId> peers;
     private final List<NodeImpl> nodes;
     private final LinkedHashMap<PeerId, MockStateMachine> fsms;
-    private final ConcurrentMap<String, RaftGroupService> serverMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Endpoint, RaftGroupService> serverMap = new ConcurrentHashMap<>();
     private final int electionTimeoutMs;
     private final Lock lock = new ReentrantLock();
     private final Consumer<NodeOptions> optsClo;
@@ -160,68 +163,93 @@ public class TestCluster {
         final boolean enableMetrics, final SnapshotThrottle snapshotThrottle,
         final RaftOptions raftOptions, final int priority) throws IOException {
 
-        if (this.serverMap.get(listenAddr.toString()) != null) {
-            return true;
-        }
+        this.lock.lock();
+        try {
+            if (this.serverMap.get(listenAddr) != null) {
+                return true;
+            }
 
-        final NodeOptions nodeOptions = new NodeOptions();
+            final NodeOptions nodeOptions = new NodeOptions();
 
-        nodeOptions.setServerName(listenAddr.toString());
+            nodeOptions.setServerName(listenAddr.toString());
 
-        nodeOptions.setCommonExecutor(JRaftUtils.createCommonExecutor(nodeOptions));
-        nodeOptions.setStripedExecutor(JRaftUtils.createAppendEntriesExecutor(nodeOptions));
-        nodeOptions.setClientExecutor(JRaftUtils.createClientExecutor(nodeOptions, nodeOptions.getServerName()));
-        nodeOptions.setScheduler(JRaftUtils.createScheduler(nodeOptions));
+            nodeOptions.setCommonExecutor(JRaftUtils.createCommonExecutor(nodeOptions));
+            nodeOptions.setStripedExecutor(JRaftUtils.createAppendEntriesExecutor(nodeOptions));
+            nodeOptions.setClientExecutor(JRaftUtils.createClientExecutor(nodeOptions, nodeOptions.getServerName()));
+            nodeOptions.setScheduler(JRaftUtils.createScheduler(nodeOptions));
 
-        nodeOptions.setElectionTimeoutMs(this.electionTimeoutMs);
-        nodeOptions.setEnableMetrics(enableMetrics);
-        nodeOptions.setSnapshotThrottle(snapshotThrottle);
-        nodeOptions.setSnapshotIntervalSecs(snapshotIntervalSecs);
-        nodeOptions.setServiceFactory(this.raftServiceFactory);
-        if (raftOptions != null) {
-            nodeOptions.setRaftOptions(raftOptions);
-        }
-        final String serverDataPath = this.dataPath + File.separator + listenAddr.toString().replace(':', '_');
-        new File(serverDataPath).mkdirs();
-        nodeOptions.setLogUri(serverDataPath + File.separator + "logs");
-        nodeOptions.setRaftMetaUri(serverDataPath + File.separator + "meta");
-        nodeOptions.setSnapshotUri(serverDataPath + File.separator + "snapshot");
-        nodeOptions.setElectionPriority(priority);
-
-        final MockStateMachine fsm = new MockStateMachine(listenAddr);
-        nodeOptions.setFsm(fsm);
-
-        if (!emptyPeers) {
-            nodeOptions.setInitialConf(new Configuration(this.peers, this.learners));
-        }
+            nodeOptions.setElectionTimeoutMs(this.electionTimeoutMs);
+            nodeOptions.setEnableMetrics(enableMetrics);
+            nodeOptions.setSnapshotThrottle(snapshotThrottle);
+            nodeOptions.setSnapshotIntervalSecs(snapshotIntervalSecs);
+            nodeOptions.setServiceFactory(this.raftServiceFactory);
+            if (raftOptions != null) {
+                nodeOptions.setRaftOptions(raftOptions);
+            }
+            final String serverDataPath = this.dataPath + File.separator + listenAddr.toString().replace(':', '_');
+            new File(serverDataPath).mkdirs();
+            nodeOptions.setLogUri(serverDataPath + File.separator + "logs");
+            nodeOptions.setRaftMetaUri(serverDataPath + File.separator + "meta");
+            nodeOptions.setSnapshotUri(serverDataPath + File.separator + "snapshot");
+            nodeOptions.setElectionPriority(priority);
+
+            final MockStateMachine fsm = new MockStateMachine(listenAddr);
+            nodeOptions.setFsm(fsm);
+
+            if (!emptyPeers) {
+                nodeOptions.setInitialConf(new Configuration(this.peers, this.learners));
+            }
 
-        List<String> servers = emptyPeers ? List.of() : this.peers.stream().map(p -> p.getEndpoint().toString()).collect(Collectors.toList());
+            List<String> servers = emptyPeers ? List.of() : this.peers.stream().map(p -> p.getEndpoint().toString()).collect(Collectors.toList());
 
-        NodeManager nodeManager = new NodeManager();
+            NodeManager nodeManager = new NodeManager();
 
-        final IgniteRpcServer rpcServer = new TestIgniteRpcServer(listenAddr, servers, nodeManager);
-        nodeOptions.setRpcClient(new IgniteRpcClient(rpcServer.clusterService(), true));
+            ClusterService clusterService = createClusterService(listenAddr, servers);
 
-        if (optsClo != null)
-            optsClo.accept(nodeOptions);
+            var rpcClient = new IgniteRpcClient(clusterService);
 
-        final RaftGroupService server = new RaftGroupService(this.name, new PeerId(listenAddr, 0, priority),
-            nodeOptions, rpcServer, nodeManager);
+            nodeOptions.setRpcClient(rpcClient);
 
-        this.lock.lock();
-        try {
-            if (this.serverMap.put(listenAddr.toString(), server) == null) {
-                final Node node = server.start();
+            var rpcServer = new TestIgniteRpcServer(clusterService, servers, nodeManager);
 
-                this.fsms.put(new PeerId(listenAddr, 0), fsm);
-                this.nodes.add((NodeImpl) node);
-                return true;
-            }
+            clusterService.start();
+
+            if (optsClo != null)
+                optsClo.accept(nodeOptions);
+
+            final RaftGroupService server = new RaftGroupService(this.name, new PeerId(listenAddr, 0, priority),
+                nodeOptions, rpcServer, nodeManager) {
+                @Override public synchronized void shutdown() {
+                    super.shutdown();
+
+                    clusterService.shutdown();
+                }
+            };
+
+            this.serverMap.put(listenAddr, server);
+
+            final Node node = server.start();
+
+            this.fsms.put(new PeerId(listenAddr, 0), fsm);
+            this.nodes.add((NodeImpl) node);
+            return true;
         }
         finally {
             this.lock.unlock();
         }
-        return false;
+    }
+
+    /**
+     * Creates a non-started {@link ClusterService}.
+     */
+    private static ClusterService createClusterService(Endpoint endpoint, List<String> members) {
+        var registry = new TestMessageSerializationRegistryImpl();
+
+        var clusterConfig = new ClusterLocalConfiguration(endpoint.toString(), endpoint.getPort(), members, registry);
+
+        var clusterServiceFactory = new TestScaleCubeClusterServiceFactory();
+
+        return clusterServiceFactory.createClusterService(clusterConfig);
     }
 
     public Node getNode(Endpoint endpoint) {
@@ -240,7 +268,7 @@ public class TestCluster {
     }
 
     public RaftGroupService getServer(Endpoint endpoint) {
-        return serverMap.get(endpoint.toString());
+        return serverMap.get(endpoint);
     }
 
     public MockStateMachine getFsmByPeer(final PeerId peer) {
@@ -265,7 +293,7 @@ public class TestCluster {
 
     public boolean stop(final Endpoint listenAddr) throws InterruptedException {
         removeNode(listenAddr);
-        final RaftGroupService raftGroupService = this.serverMap.remove(listenAddr.toString());
+        final RaftGroupService raftGroupService = this.serverMap.remove(listenAddr);
         raftGroupService.shutdown();
         return true;
     }
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
index 01cd57f..9948db1 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
@@ -46,7 +46,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 public abstract class AbstractRpcTest {
     protected Endpoint endpoint;
 
-    private final List<RpcServer<?>> servers = new ArrayList<>();
+    private RpcServer<?> server;
 
     private final List<RpcClient> clients = new ArrayList<>();
 
@@ -54,18 +54,18 @@ public abstract class AbstractRpcTest {
     public void setup() {
         endpoint = new Endpoint(TestUtils.getLocalAddress(), INIT_PORT);
 
-        RpcServer<?> server = createServer(endpoint);
+        server = createServer(endpoint);
+
         server.registerProcessor(new Request1RpcProcessor());
         server.registerProcessor(new Request2RpcProcessor());
         server.init(null);
-
-        servers.add(server);
     }
 
     @AfterEach
     public void tearDown() {
         clients.forEach(RpcClient::shutdown);
-        servers.forEach(RpcServer::shutdown);
+
+        server.shutdown();
     }
 
     /**
@@ -80,6 +80,8 @@ public abstract class AbstractRpcTest {
     private RpcClient createClient() {
         RpcClient client = createClient0();
 
+        client.init(null);
+
         clients.add(client);
 
         return client;
@@ -135,7 +137,7 @@ public abstract class AbstractRpcTest {
         assertTrue(client1.checkConnection(endpoint));
         assertTrue(client2.checkConnection(endpoint));
 
-        servers.get(0).shutdown();
+        server.shutdown();
 
         assertTrue(waitForTopology(client1, 2, 5_000));
         assertTrue(waitForTopology(client2, 2, 5_000));
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
index 52f1ec8..d5370b9 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
@@ -23,7 +23,6 @@ import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessageSerializationRegistryImpl;
 import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
-import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.raft.jraft.NodeManager;
 import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
 import org.apache.ignite.raft.jraft.util.Endpoint;
@@ -32,17 +31,24 @@ import org.apache.ignite.raft.jraft.util.Endpoint;
  *
  */
 public class IgniteRpcTest extends AbstractRpcTest {
-    /**
-     * Serialization registry.
-     */
-    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
-
     /** The counter. */
     private final AtomicInteger cntr = new AtomicInteger();
 
     /** {@inheritDoc} */
     @Override public RpcServer<?> createServer(Endpoint endpoint) {
-        return new TestIgniteRpcServer(endpoint, new NodeManager());
+        ClusterService service = createService(endpoint.toString(), endpoint.getPort(), List.of());
+
+        var server = new TestIgniteRpcServer(service, List.of(), new NodeManager()) {
+            @Override public void shutdown() {
+                super.shutdown();
+
+                service.shutdown();
+            }
+        };
+
+        service.start();
+
+        return server;
     }
 
     /** {@inheritDoc} */
@@ -51,7 +57,15 @@ public class IgniteRpcTest extends AbstractRpcTest {
 
         ClusterService service = createService("client" + i, endpoint.getPort() - i, List.of(endpoint.toString()));
 
-        IgniteRpcClient client = new IgniteRpcClient(service, false);
+        IgniteRpcClient client = new IgniteRpcClient(service) {
+            @Override public void shutdown() {
+                super.shutdown();
+
+                service.shutdown();
+            }
+        };
+
+        service.start();
 
         waitForTopology(client, 1 + i, 5_000);
 
@@ -64,8 +78,9 @@ public class IgniteRpcTest extends AbstractRpcTest {
      * @param servers Server nodes of the cluster.
      * @return The client cluster view.
      */
-    protected ClusterService createService(String name, int port, List<String> servers) {
-        var context = new ClusterLocalConfiguration(name, port, servers, SERIALIZATION_REGISTRY);
+    private static ClusterService createService(String name, int port, List<String> servers) {
+        var registry = new MessageSerializationRegistryImpl();
+        var context = new ClusterLocalConfiguration(name, port, servers, registry);
         var factory = new TestScaleCubeClusterServiceFactory();
 
         return factory.createClusterService(context);
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
index 1671155..9052f10 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
@@ -18,63 +18,28 @@
 package org.apache.ignite.raft.jraft.rpc;
 
 import java.util.List;
-import org.apache.ignite.network.ClusterLocalConfiguration;
-import org.apache.ignite.network.MessageSerializationRegistryImpl;
-import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
-import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
-import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
 import org.apache.ignite.raft.jraft.JRaftUtils;
 import org.apache.ignite.raft.jraft.NodeManager;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
-import org.apache.ignite.raft.jraft.util.Endpoint;
 
 /**
  * RPC server configured for integration tests.
  */
 public class TestIgniteRpcServer extends IgniteRpcServer {
     /**
-     * The registry.
+     * @param clusterService cluster service
+     * @param servers server list
+     * @param nodeManager node manager
      */
-    /** */
-    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
-
-    /**
-     * Service factory.
-     */
-    private final static ScaleCubeClusterServiceFactory SERVICE_FACTORY = new TestScaleCubeClusterServiceFactory();
-
-    /**
-     * Message factory.
-     */
-    private static final RaftClientMessagesFactory MSG_FACTORY = new RaftClientMessagesFactory();
-
-    /**
-     * @param endpoint The endpoint.
-     * @param nodeManager The node manager.
-     */
-    public TestIgniteRpcServer(Endpoint endpoint, NodeManager nodeManager) {
-        this(endpoint.getIp() + ":" + endpoint.getPort(), endpoint.getPort(), List.of(), nodeManager);
-    }
-
-    /**
-     * @param endpoint The endpoint.
-     * @param servers Server list.
-     * @param nodeManager The node manager.
-     */
-    public TestIgniteRpcServer(Endpoint endpoint, List<String> servers, NodeManager nodeManager) {
-        this(endpoint.getIp() + ":" + endpoint.getPort(), endpoint.getPort(), servers, nodeManager);
-    }
-
-    /**
-     * @param name The name.
-     * @param port The port.
-     * @param servers Server list.
-     * @param nodeManager The node manager.
-     */
-    public TestIgniteRpcServer(String name, int port, List<String> servers, NodeManager nodeManager) {
-        super(SERVICE_FACTORY.createClusterService(new ClusterLocalConfiguration(name, port, servers, SERIALIZATION_REGISTRY)),
-            false, nodeManager, MSG_FACTORY, JRaftUtils.createRequestExecutor(new NodeOptions()));
+    public TestIgniteRpcServer(ClusterService clusterService, List<String> servers, NodeManager nodeManager) {
+        super(
+            clusterService,
+            nodeManager,
+            new RaftClientMessagesFactory(),
+            JRaftUtils.createRequestExecutor(new NodeOptions())
+        );
     }
 }
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index cab6b97..d6cec07 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -158,17 +158,13 @@ public class ITDistributedTableTest {
     public void partitionListener() throws Exception {
         String grpId = "part";
 
-        RaftServer partSrv = new RaftServerImpl(
-            cluster.get(0),
-            FACTORY,
-            true
-        );
+        RaftServer partSrv = new RaftServerImpl(cluster.get(0), FACTORY);
 
         List<Peer> conf = List.of(new Peer(cluster.get(0).topologyService().localMember().address()));
 
         partSrv.startRaftGroup(grpId, new PartitionListener(), conf);
 
-        RaftGroupService partRaftGrp = new RaftGroupServiceImpl(grpId, client, FACTORY, 10_000, conf, true, 200, true);
+        RaftGroupService partRaftGrp = new RaftGroupServiceImpl(grpId, client, FACTORY, 10_000, conf, true, 200);
 
         Row testRow = getTestRow();
 
@@ -222,13 +218,8 @@ public class ITDistributedTableTest {
     public void partitionedTable() {
         HashMap<ClusterNode, RaftServer> raftServers = new HashMap<>(NODES);
 
-        for (int i = 0; i < NODES; i++) {
-            raftServers.put(cluster.get(i).topologyService().localMember(), new RaftServerImpl(
-                cluster.get(i),
-                FACTORY,
-                true
-            ));
-        }
+        for (int i = 0; i < NODES; i++)
+            raftServers.put(cluster.get(i).topologyService().localMember(), new RaftServerImpl(cluster.get(i), FACTORY));
 
         List<List<ClusterNode>> assignment = RendezvousAffinityFunction.assignPartitions(
             cluster.stream().map(node -> node.topologyService().localMember()).collect(Collectors.toList()),
@@ -251,7 +242,7 @@ public class ITDistributedTableTest {
 
             rs.startRaftGroup(grpId, new PartitionListener(), conf);
 
-            partMap.put(p, new RaftGroupServiceImpl(grpId, client, FACTORY, 10_000, conf, true, 200, true));
+            partMap.put(p, new RaftGroupServiceImpl(grpId, client, FACTORY, 10_000, conf, true, 200));
 
             p++;
         }