You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/11/30 13:21:55 UTC

[ignite-3] branch main updated: IGNITE-15528 Fixed a thread leak in jraft, introduced shared pools mode.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c97228  IGNITE-15528 Fixed a thread leak in jraft, introduced shared pools mode.
2c97228 is described below

commit 2c9722861f990d8b77359257fc1b32862fd18236
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Tue Nov 30 16:21:36 2021 +0300

    IGNITE-15528 Fixed a thread leak in jraft, introduced shared pools mode.
---
 .../ignite/raft/jraft/core/ItCliServiceTest.java   |   7 +-
 .../apache/ignite/raft/jraft/core/ItNodeTest.java  |  76 +---
 .../raft/server/ItJraftCounterServerTest.java      | 138 +++++++-
 .../internal/raft/server/impl/JraftServerImpl.java |  40 +++
 .../org/apache/ignite/raft/jraft/JRaftUtils.java   |  52 +--
 .../apache/ignite/raft/jraft/core/NodeImpl.java    | 392 +++++++++++++--------
 .../raft/jraft/core/ReadOnlyServiceImpl.java       |   2 +-
 .../apache/ignite/raft/jraft/core/Replicator.java  |   2 +-
 .../raft/jraft/core/StateMachineAdapter.java       |   7 +
 .../ignite/raft/jraft/option/NodeOptions.java      | 212 +++++++----
 .../ignite/raft/jraft/option/RpcOptions.java       |  67 +---
 .../ignite/raft/jraft/rpc/ClientService.java       |  14 +-
 .../raft/jraft/rpc/impl/AbstractClientService.java |  84 ++---
 .../raft/jraft/rpc/impl/IgniteRpcClient.java       |  26 +-
 .../raft/jraft/rpc/impl/IgniteRpcServer.java       |  11 +-
 .../raft/jraft/rpc/impl/PingRequestProcessor.java  |   5 +
 .../raft/jraft/storage/impl/LogManagerImpl.java    |   2 +-
 .../storage/snapshot/SnapshotExecutorImpl.java     |   2 +-
 .../ignite/raft/jraft/util/timer/DefaultTimer.java |   2 +-
 .../internal/raft/server/impl/RaftServerImpl.java  |   4 +-
 .../ignite/raft/jraft/core/MockStateMachine.java   |   5 -
 .../raft/jraft/core/ReadOnlyServiceTest.java       |   1 -
 .../apache/ignite/raft/jraft/core/TestCluster.java | 104 ++----
 .../raft/jraft/storage/SnapshotExecutorTest.java   |   2 +-
 .../apache/ignite/raft/jraft/test/TestUtils.java   |  13 +
 25 files changed, 737 insertions(+), 533 deletions(-)

diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
index eea4acc..c3993b3 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
@@ -154,6 +154,8 @@ public class ItCliServiceTest {
         cluster.stopAll();
         ExecutorServiceHelper.shutdownAndAwaitTermination(clientExecutor);
 
+        TestUtils.assertAllJraftThreadsStopped();
+
         LOG.info(">>>>>>>>>>>>>>> End test method: " + testInfo.getDisplayName());
     }
 
@@ -314,7 +316,8 @@ public class ItCliServiceTest {
         assertNotNull(oldLeaderNode);
         PeerId oldLeader = oldLeaderNode.getNodeId().getPeerId();
         assertNotNull(oldLeader);
-        assertTrue(cliService.changePeers(groupId, conf, new Configuration(newPeers)).isOk());
+        Status status = cliService.changePeers(groupId, conf, new Configuration(newPeers));
+        assertTrue(status.isOk(), status.getErrorMsg());
         cluster.waitLeader();
         PeerId newLeader = cluster.getLeader().getNodeId().getPeerId();
         assertNotEquals(oldLeader, newLeader);
@@ -479,7 +482,7 @@ public class ItCliServiceTest {
         for (Map.Entry<String, PeerId> entry : rebalancedLeaderIds.entrySet())
             ret.compute(entry.getValue(), (ignored, num) -> num == null ? 1 : num + 1);
         for (Map.Entry<PeerId, Integer> entry : ret.entrySet()) {
-            System.out.println(entry);
+            LOG.info(entry.toString());
             assertEquals(new PeerId("host_1", 8080), entry.getKey());
         }
     }
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 8240aac..2e403ae 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -60,7 +60,6 @@ import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
 import org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
 import org.apache.ignite.raft.jraft.closure.TaskClosure;
 import org.apache.ignite.raft.jraft.conf.Configuration;
-import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.EnumOutter;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.entity.Task;
@@ -81,7 +80,6 @@ import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
 import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
 import org.apache.ignite.raft.jraft.rpc.impl.core.DefaultRaftClientService;
 import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
-import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
 import org.apache.ignite.raft.jraft.storage.snapshot.ThroughputSnapshotThrottle;
 import org.apache.ignite.raft.jraft.test.TestUtils;
@@ -159,12 +157,7 @@ public class ItNodeTest {
     private final List<FixedThreadsExecutorGroup> appendEntriesExecutors = new ArrayList<>();
 
     /** Test info. */
-    private final TestInfo testInfo;
-
-    /** */
-    public ItNodeTest(TestInfo testInfo) {
-        this.testInfo = testInfo;
-    }
+    private TestInfo testInfo;
 
     @BeforeAll
     public static void setupNodeTest() {
@@ -182,9 +175,10 @@ public class ItNodeTest {
     }
 
     @BeforeEach
-    public void setup(@WorkDirectory Path workDir) throws Exception {
+    public void setup(TestInfo testInfo, @WorkDirectory Path workDir) throws Exception {
         LOG.info(">>>>>>>>>>>>>>> Start test method: " + testInfo.getDisplayName());
 
+        this.testInfo = testInfo;
         dataPath = workDir.toString();
 
         testStartMs = Utils.monotonicMs();
@@ -211,6 +205,9 @@ public class ItNodeTest {
 
         startedCounter.set(0);
         stoppedCounter.set(0);
+
+        TestUtils.assertAllJraftThreadsStopped();
+
         LOG.info(">>>>>>>>>>>>>>> End test method: " + testInfo.getDisplayName() + ", cost:"
             + (Utils.monotonicMs() - testStartMs) + " ms.");
     }
@@ -260,8 +257,9 @@ public class ItNodeTest {
         AtomicInteger c = new AtomicInteger(0);
         for (int i = 0; i < 10; i++) {
             ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
+            int finalI = i;
             Task task = new Task(data, new JoinableClosure(status -> {
-                System.out.println(status);
+                LOG.info("{} i={}", status, finalI);
                 if (!status.isOk()) {
                     assertTrue(
                         status.getRaftError() == RaftError.EBUSY || status.getRaftError() == RaftError.EPERM);
@@ -1509,10 +1507,8 @@ public class ItNodeTest {
         PeerId oldLeader = leader.getNodeId().getPeerId();
         assertTrue(cluster.stop(leader.getNodeId().getPeerId().getEndpoint()));
 
-        // apply something when follower
-        //final List<Node> followers = cluster.getFollowers();
         assertFalse(followers.isEmpty());
-        sendTestTaskAndWait("follower apply ", followers.get(0), -1);
+        sendTestTaskAndWait("follower apply ", followers.get(0), -1); // Should fail, because no leader.
 
         for (Node follower : followers) {
             NodeImpl follower0 = (NodeImpl) follower;
@@ -1551,7 +1547,7 @@ public class ItNodeTest {
         cluster.clean(oldLeader.getEndpoint());
 
         // restart old leader
-        LOG.info("restart old leader {}", oldLeader);
+        LOG.info("Restart old leader with cleanup {}", oldLeader);
         assertTrue(cluster.start(oldLeader.getEndpoint()));
         cluster.ensureSame();
 
@@ -2976,7 +2972,8 @@ public class ItNodeTest {
         done.reset();
         // works
         leader.changePeers(conf, done);
-        assertTrue(done.await().isOk());
+        Status await = done.await();
+        assertTrue(await.isOk(), await.getErrorMsg());
 
         cluster.ensureSame();
         assertEquals(3, cluster.getFsms().size());
@@ -3391,16 +3388,7 @@ public class ItNodeTest {
     }
 
     private NodeOptions createNodeOptions() {
-        NodeOptions options = new NodeOptions();
-
-        ExecutorService executor = JRaftUtils.createCommonExecutor(options);
-        executors.add(executor);
-        options.setCommonExecutor(executor);
-        FixedThreadsExecutorGroup appendEntriesExecutor = JRaftUtils.createAppendEntriesExecutor(options);
-        appendEntriesExecutors.add(appendEntriesExecutor);
-        options.setStripedExecutor(appendEntriesExecutor);
-
-        return options;
+        return new NodeOptions();
     }
 
     /**
@@ -3478,35 +3466,6 @@ public class ItNodeTest {
         Configuration initialConf = nodeOptions.getInitialConf();
         nodeOptions.setStripes(1);
 
-        StripedDisruptor<FSMCallerImpl.ApplyTask> fsmCallerDusruptor;
-        StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeDisruptor;
-        StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor;
-        StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
-
-        nodeOptions.setfSMCallerExecutorDisruptor(fsmCallerDusruptor = new StripedDisruptor<>(
-            "JRaft-FSMCaller-Disruptor_ITNodeTest",
-            nodeOptions.getRaftOptions().getDisruptorBufferSize(),
-            () -> new FSMCallerImpl.ApplyTask(),
-            nodeOptions.getStripes()));
-
-        nodeOptions.setNodeApplyDisruptor(nodeDisruptor = new StripedDisruptor<>(
-            "JRaft-NodeImpl-Disruptor_ITNodeTest",
-            nodeOptions.getRaftOptions().getDisruptorBufferSize(),
-            () -> new NodeImpl.LogEntryAndClosure(),
-            nodeOptions.getStripes()));
-
-        nodeOptions.setReadOnlyServiceDisruptor(readOnlyServiceDisruptor = new StripedDisruptor<>(
-            "JRaft-ReadOnlyService-Disruptor_ITNodeTest",
-            nodeOptions.getRaftOptions().getDisruptorBufferSize(),
-            () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
-            nodeOptions.getStripes()));
-
-        nodeOptions.setLogManagerDisruptor(logManagerDisruptor = new StripedDisruptor<>(
-            "JRaft-LogManager-Disruptor_ITNodeTest",
-            nodeOptions.getRaftOptions().getDisruptorBufferSize(),
-            () -> new LogManagerImpl.StableClosureEvent(),
-            nodeOptions.getStripes()));
-
         Stream<PeerId> peers = initialConf == null ?
             Stream.empty() :
             Stream.concat(initialConf.getPeers().stream(), initialConf.getLearners().stream());
@@ -3538,14 +3497,11 @@ public class ItNodeTest {
 
         var service = new RaftGroupService(groupId, peerId, nodeOptions, rpcServer, nodeManager) {
             @Override public synchronized void shutdown() {
-                super.shutdown();
+                rpcServer.shutdown();
 
+                super.shutdown();
+    
                 clusterService.stop();
-
-                fsmCallerDusruptor.shutdown();
-                nodeDisruptor.shutdown();
-                readOnlyServiceDisruptor.shutdown();
-                logManagerDisruptor.shutdown();
             }
         };
 
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
index 4d9124d..7a4770d 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.raft.server;
 
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.raft.jraft.core.State.STATE_ERROR;
 import static org.apache.ignite.raft.jraft.core.State.STATE_LEADER;
 import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
@@ -35,6 +38,9 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -61,9 +67,12 @@ import org.apache.ignite.raft.client.WriteCommand;
 import org.apache.ignite.raft.client.service.CommandClosure;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.jraft.test.TestUtils;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
 import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -149,6 +158,8 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
     @AfterEach
     @Override
     protected void after() throws Exception {
+        super.after();
+
         LOG.info("Start client shutdown");
 
         Iterator<RaftGroupService> iterClients = clients.iterator();
@@ -170,8 +181,11 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
 
             iterSrv.remove();
 
-            server.stopRaftGroup(COUNTER_GROUP_0);
-            server.stopRaftGroup(COUNTER_GROUP_1);
+            Set<String> grps = server.startedGroups();
+
+            for (String grp : grps) {
+                server.stopRaftGroup(grp);
+            }
 
             server.beforeNodeStop();
 
@@ -180,7 +194,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
 
         IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
 
-        super.after();
+        TestUtils.assertAllJraftThreadsStopped();
 
         LOG.info(">>>>>>>>>>>>>>> End test method: {}", testInfo.getTestMethod().orElseThrow().getName());
     }
@@ -189,14 +203,21 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
      * Starts server.
      *
      * @param idx The index.
+     * @param clo Init closure.
+     * @param cons Node options updater.
+     *
      * @return Raft server instance.
      */
-    private JraftServerImpl startServer(int idx, Consumer<RaftServer> clo) {
+    private JraftServerImpl startServer(int idx, Consumer<RaftServer> clo, Consumer<NodeOptions> cons) {
         var addr = new NetworkAddress(getLocalAddress(), PORT);
 
         ClusterService service = clusterService(PORT + idx, List.of(addr), true);
 
-        JraftServerImpl server = new JraftServerImpl(service, dataPath) {
+        NodeOptions opts = new NodeOptions();
+
+        cons.accept(opts);
+
+        JraftServerImpl server = new JraftServerImpl(service, dataPath, opts) {
             @Override
             public void stop() {
                 servers.remove(this);
@@ -248,7 +269,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
             startServer(i, raftServer -> {
                 raftServer.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF);
                 raftServer.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF);
-            });
+            }, opts -> {});
         }
 
         startClient(COUNTER_GROUP_0);
@@ -262,13 +283,13 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
     public void testDisruptorThreadsCount() {
         startServer(0, raftServer -> {
             raftServer.startRaftGroup("test_raft_group", listenerFactory.get(), INITIAL_CONF);
-        });
+        }, opts -> {});
 
         Set<Thread> threads = getAllDisruptorCurrentThreads();
 
         int threadsBefore = threads.size();
 
-        Set<String> threadNamesBefore = threads.stream().map(Thread::getName).collect(Collectors.toSet());
+        Set<String> threadNamesBefore = threads.stream().map(Thread::getName).collect(toSet());
 
         assertEquals(NodeOptions.DEFAULT_STRIPES * 4/*services*/, threadsBefore, "Started thread names: " + threadNamesBefore);
 
@@ -282,7 +303,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
 
         int threadsAfter = threads.size();
 
-        Set<String> threadNamesAfter = threads.stream().map(Thread::getName).collect(Collectors.toSet());
+        Set<String> threadNamesAfter = threads.stream().map(Thread::getName).collect(toSet());
 
         threadNamesAfter.removeAll(threadNamesBefore);
 
@@ -305,11 +326,11 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
     @NotNull
     private Set<Thread> getAllDisruptorCurrentThreads() {
         return Thread.getAllStackTraces().keySet().stream().filter(t ->
-                t.getName().contains("JRaft-FSMCaller-Disruptor")
-                        || t.getName().contains("JRaft-NodeImpl-Disruptor")
-                        || t.getName().contains("JRaft-ReadOnlyService-Disruptor")
-                        || t.getName().contains("JRaft-LogManager-Disruptor"))
-                .collect(Collectors.toSet());
+                        t.getName().contains("JRaft-FSMCaller-Disruptor")
+                                || t.getName().contains("JRaft-NodeImpl-Disruptor")
+                                || t.getName().contains("JRaft-ReadOnlyService-Disruptor")
+                                || t.getName().contains("JRaft-LogManager-Disruptor"))
+                .collect(toSet());
     }
 
     @Test
@@ -652,6 +673,91 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
         doTestFollowerCatchUp(true, false);
     }
 
+    /** Tests if a starting a new group in shared pools mode doesn't increases timer threads count. */
+    @Test
+    public void testTimerThreadsCount() {
+        JraftServerImpl srv0 = startServer(0, x -> {}, opts -> opts.setTimerPoolSize(1));
+        JraftServerImpl srv1 = startServer(1, x -> {}, opts -> opts.setTimerPoolSize(1));
+        JraftServerImpl srv2 = startServer(2, x -> {}, opts -> opts.setTimerPoolSize(1));
+
+        waitForTopology(srv0.clusterService(), 3, 5_000);
+
+        ExecutorService svc = Executors.newFixedThreadPool(16);
+
+        final int groupsCnt = 10;
+
+        try {
+            List<Future<?>> futs = new ArrayList<>(groupsCnt);
+
+            for (int i = 0; i < groupsCnt; i++) {
+                int finalI = i;
+                futs.add(svc.submit(new Runnable() {
+                    @Override public void run() {
+                        String grp = "counter" + finalI;
+
+                        srv0.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF);
+                        srv1.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF);
+                        srv2.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF);
+                    }
+                }));
+            }
+
+            for (Future<?> fut : futs) {
+                try {
+                    fut.get();
+                } catch (Exception e) {
+                    fail(e.getMessage());
+                }
+            }
+        } finally {
+            ExecutorServiceHelper.shutdownAndAwaitTermination(svc);
+        }
+
+        for (int i = 0; i < groupsCnt; i++) {
+            String grp = "counter" + i;
+
+            assertTrue(waitForCondition(() -> hasLeader(grp), 30_000));
+        }
+
+        Set<Thread> threads = Thread.getAllStackTraces().keySet();
+
+        LOG.info("RAFT threads count {}", threads.stream().filter(t -> t.getName().contains("JRaft")).count());
+
+        List<Thread> timerThreads = threads.stream().filter(this::isTimer).sorted(comparing(Thread::getName)).collect(toList());
+
+        assertTrue(timerThreads.size() <= 15, // This is a maximum possible number of a timer threads for 3 nodes in this test.
+                "All timer threads: " + timerThreads.toString());
+    }
+
+    /**
+     * Returns {@code true} if thread is related to timers.
+     *
+     * @param thread The thread.
+     * @return {@code True} if a timer thread.
+     */
+    private boolean isTimer(Thread thread) {
+        String name = thread.getName();
+
+        return name.contains("ElectionTimer") || name.contains("VoteTimer")
+                || name.contains("StepDownTimer") || name.contains("SnapshotTimer") || name.contains("Node-Scheduler");
+    }
+
+    /**
+     * Returns {@code true} if a raft group has elected a leader for a some term.
+     *
+     * @param grpId Group id.
+     * @return {@code True} if a leader is elected.
+     */
+    private boolean hasLeader(String grpId) {
+        return servers.stream().anyMatch(s -> {
+            NodeImpl node = (NodeImpl) s.raftGroupService(grpId).getRaftNode();
+
+            StateMachineAdapter fsm = (StateMachineAdapter) node.getOptions().getFsm();
+
+            return node.isLeader() && fsm.getLeaderTerm() == node.getCurrentTerm();
+        });
+    }
+
     /**
      * Do test follower catch up.
      *
@@ -721,7 +827,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
         var svc2 = startServer(stopIdx, r -> {
             r.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF);
             r.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF);
-        });
+        }, opts -> {});
 
         waitForCondition(() -> validateStateMachine(sum(20), svc2, COUNTER_GROUP_0), 5_000);
         waitForCondition(() -> validateStateMachine(sum(30), svc2, COUNTER_GROUP_1), 5_000);
@@ -736,7 +842,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
         var svc3 = startServer(stopIdx, r -> {
             r.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF);
             r.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF);
-        });
+        }, opts -> {});
 
         waitForCondition(() -> validateStateMachine(sum(20), svc3, COUNTER_GROUP_0), 5_000);
         waitForCondition(() -> validateStateMachine(sum(30), svc3, COUNTER_GROUP_1), 5_000);
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index e33fd47..30f2c01 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -115,6 +115,11 @@ public class JraftServerImpl implements RaftServer {
         this.nodeManager = new NodeManager();
         this.opts = opts;
 
+        // Auto-adjust options.
+        this.opts.setRpcConnectTimeoutMs(this.opts.getElectionTimeoutMs() / 3);
+        this.opts.setRpcDefaultTimeout(this.opts.getElectionTimeoutMs() / 2);
+        this.opts.setSharedPools(true);
+
         if (opts.getServerName() == null) {
             opts.setServerName(service.localConfiguration().getName());
         }
@@ -123,6 +128,9 @@ public class JraftServerImpl implements RaftServer {
     /** {@inheritDoc} */
     @Override
     public void start() {
+        assert opts.isSharedPools() : "RAFT server is supposed to run in shared pools mode";
+
+        // Pre-create all pools in shared mode.
         if (opts.getCommonExecutor() == null) {
             opts.setCommonExecutor(JRaftUtils.createCommonExecutor(opts));
         }
@@ -139,6 +147,22 @@ public class JraftServerImpl implements RaftServer {
             opts.setClientExecutor(JRaftUtils.createClientExecutor(opts, opts.getServerName()));
         }
 
+        if (opts.getVoteTimer() == null) {
+            opts.setVoteTimer(JRaftUtils.createTimer(opts, "JRaft-VoteTimer"));
+        }
+
+        if (opts.getElectionTimer() == null) {
+            opts.setElectionTimer(JRaftUtils.createTimer(opts, "JRaft-ElectionTimer"));
+        }
+
+        if (opts.getStepDownTimer() == null) {
+            opts.setStepDownTimer(JRaftUtils.createTimer(opts, "JRaft-StepDownTimer"));
+        }
+
+        if (opts.getSnapshotTimer() == null) {
+            opts.setSnapshotTimer(JRaftUtils.createTimer(opts, "JRaft-SnapshotTimer"));
+        }
+
         requestExecutor = JRaftUtils.createRequestExecutor(opts);
 
         rpcServer = new IgniteRpcServer(
@@ -218,6 +242,22 @@ public class JraftServerImpl implements RaftServer {
             opts.getScheduler().shutdown();
         }
 
+        if (opts.getElectionTimer() != null) {
+            opts.getElectionTimer().stop();
+        }
+
+        if (opts.getVoteTimer() != null) {
+            opts.getVoteTimer().stop();
+        }
+
+        if (opts.getStepDownTimer() != null) {
+            opts.getStepDownTimer().stop();
+        }
+
+        if (opts.getSnapshotTimer() != null) {
+            opts.getSnapshotTimer().stop();
+        }
+
         if (opts.getClientExecutor() != null) {
             ExecutorServiceHelper.shutdownAndAwaitTermination(opts.getClientExecutor());
         }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
index 6ad6349..5f5c459 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
@@ -24,23 +24,21 @@ import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.jraft.conf.Configuration;
-import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
 import org.apache.ignite.raft.jraft.core.NodeImpl;
-import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
 import org.apache.ignite.raft.jraft.core.Scheduler;
 import org.apache.ignite.raft.jraft.core.TimerManager;
-import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.option.BootstrapOptions;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.option.RpcOptions;
-import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
 import org.apache.ignite.raft.jraft.util.Endpoint;
 import org.apache.ignite.raft.jraft.util.StringUtils;
 import org.apache.ignite.raft.jraft.util.ThreadPoolUtil;
 import org.apache.ignite.raft.jraft.util.Utils;
 import org.apache.ignite.raft.jraft.util.concurrent.DefaultFixedThreadsExecutorGroupFactory;
 import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
+import org.apache.ignite.raft.jraft.util.timer.DefaultTimer;
+import org.apache.ignite.raft.jraft.util.timer.Timer;
 
 /**
  * Some helper methods for jraft usage.
@@ -59,44 +57,10 @@ public final class JRaftUtils {
 
         nodeOpts.setStripes(1);
 
-        StripedDisruptor<FSMCallerImpl.ApplyTask> fsmCallerDusruptor;
-        StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeDisruptor;
-        StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor;
-        StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
-
-        nodeOpts.setfSMCallerExecutorDisruptor(fsmCallerDusruptor = new StripedDisruptor<>(
-            "JRaft-FSMCaller-Disruptor_bootstrap",
-            nodeOpts.getRaftOptions().getDisruptorBufferSize(),
-            () -> new FSMCallerImpl.ApplyTask(),
-            nodeOpts.getStripes()));
-
-        nodeOpts.setNodeApplyDisruptor(nodeDisruptor = new StripedDisruptor<>(
-            "JRaft-NodeImpl-Disruptor_bootstrap",
-            nodeOpts.getRaftOptions().getDisruptorBufferSize(),
-            () -> new NodeImpl.LogEntryAndClosure(),
-            nodeOpts.getStripes()));
-
-        nodeOpts.setReadOnlyServiceDisruptor(readOnlyServiceDisruptor = new StripedDisruptor<>(
-            "JRaft-ReadOnlyService-Disruptor_bootstrap",
-            nodeOpts.getRaftOptions().getDisruptorBufferSize(),
-            () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
-            nodeOpts.getStripes()));
-
-        nodeOpts.setLogManagerDisruptor(logManagerDisruptor = new StripedDisruptor<>(
-            "JRaft-LogManager-Disruptor_bootstrap",
-            nodeOpts.getRaftOptions().getDisruptorBufferSize(),
-            () -> new LogManagerImpl.StableClosureEvent(),
-            nodeOpts.getStripes()));
-
         final boolean ret = node.bootstrap(opts);
         node.shutdown();
         node.join();
 
-        fsmCallerDusruptor.shutdown();
-        nodeDisruptor.shutdown();
-        readOnlyServiceDisruptor.shutdown();
-        logManagerDisruptor.shutdown();
-
         return ret;
     }
 
@@ -187,6 +151,18 @@ public final class JRaftUtils {
     }
 
     /**
+     * @param opts Node options.
+     * @param name The name.
+     * @return The timer.
+     */
+    public static Timer createTimer(NodeOptions opts, String name) {
+        return new DefaultTimer(
+            opts.getTimerPoolSize(),
+            NamedThreadFactory.threadPrefix(opts.getServerName(), name)
+        );
+    }
+
+    /**
      * Create a striped executor.
      *
      * @param prefix Thread name prefix.
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 4fce934..8957c46 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -33,11 +33,13 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.jraft.Closure;
 import org.apache.ignite.raft.jraft.FSMCaller;
 import org.apache.ignite.raft.jraft.JRaftServiceFactory;
+import org.apache.ignite.raft.jraft.JRaftUtils;
 import org.apache.ignite.raft.jraft.Node;
 import org.apache.ignite.raft.jraft.ReadOnlyService;
 import org.apache.ignite.raft.jraft.ReplicatorGroup;
@@ -105,6 +107,7 @@ import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl;
 import org.apache.ignite.raft.jraft.util.ByteString;
 import org.apache.ignite.raft.jraft.util.Describer;
 import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
 import org.apache.ignite.raft.jraft.util.OnlyForTest;
 import org.apache.ignite.raft.jraft.util.RepeatedTimer;
 import org.apache.ignite.raft.jraft.util.Requires;
@@ -114,7 +117,6 @@ import org.apache.ignite.raft.jraft.util.ThreadHelper;
 import org.apache.ignite.raft.jraft.util.ThreadId;
 import org.apache.ignite.raft.jraft.util.Utils;
 import org.apache.ignite.raft.jraft.util.concurrent.LongHeldDetectingReadWriteLock;
-import org.apache.ignite.raft.jraft.util.timer.RaftTimerFactory;
 
 /**
  * The raft replica node implementation.
@@ -170,19 +172,13 @@ public class NodeImpl implements Node, RaftServerService {
     private final List<Closure> shutdownContinuations = new ArrayList<>();
     private RaftClientService rpcClientService;
     private ReadOnlyService readOnlyService;
+
     /**
      * Timers
      */
-    private Scheduler timerManager;
     private RepeatedTimer electionTimer;
     private RepeatedTimer voteTimer;
-    private RepeatedTimer stepDownTimer;
-
-    private boolean sharedTimerManager;
-
-    /**
-     * Triggered on a leader each electionTimeoutMs / 2 milliseconds to ensure the alive quorum.
-     */
+    private RepeatedTimer stepDownTimer; // Triggered on a leader each electionTimeoutMs / 2 millis to ensure the quorum.
     private RepeatedTimer snapshotTimer;
     private ScheduledFuture<?> transferTimer;
     private ThreadId wakingCandidate;
@@ -214,11 +210,6 @@ public class NodeImpl implements Node, RaftServerService {
      */
     private volatile int electionTimeoutCounter;
 
-    /**
-     * Timer factory.
-     */
-    private RaftTimerFactory timerFactory;
-
     private static class NodeReadWriteLock extends LongHeldDetectingReadWriteLock {
         static final long MAX_BLOCKING_MS_TO_REPORT = SystemPropertyUtil.getLong(
             "jraft.node.detecting.lock.max_blocking_ms_to_report", -1);
@@ -788,6 +779,8 @@ public class NodeImpl implements Node, RaftServerService {
         // Create fsmCaller at first as logManager needs it to report error
         this.fsmCaller = new FSMCallerImpl();
 
+        initPools(opts.getNodeOptions());
+
         if (!initLogStorage()) {
             LOG.error("Fail to init log storage.");
             return false;
@@ -873,7 +866,6 @@ public class NodeImpl implements Node, RaftServerService {
         this.metrics = new NodeMetrics(opts.isEnableMetrics());
         this.serverId.setPriority(opts.getElectionPriority());
         this.electionTimeoutCounter = 0;
-        this.timerFactory = opts.getServiceFactory().createRaftTimerFactory();
         if (opts.getReplicationStateListeners() != null)
             this.replicatorStateListeners.addAll(opts.getReplicationStateListeners());
 
@@ -882,77 +874,11 @@ public class NodeImpl implements Node, RaftServerService {
             return false;
         }
 
-        // Init timers
-        final String suffix = getOptions().getServerName() + "-";
-
-        if (getOptions().getScheduler() == null)
-            timerManager = timerFactory.createScheduler(this.options.getTimerPoolSize(),
-            "JRaft-Node-ScheduleThreadPool-" + suffix);
-        else {
-            sharedTimerManager = true;
-            timerManager = getOptions().getScheduler();
-        }
-
-        String name = "JRaft-VoteTimer-" + suffix;
-        this.voteTimer = new RepeatedTimer(name, options.getElectionTimeoutMs(), timerFactory.getVoteTimer(name)) {
-            @Override
-            protected void onTrigger() {
-                handleVoteTimeout();
-            }
-
-            @Override
-            protected int adjustTimeout(final int timeoutMs) {
-                return randomTimeout(timeoutMs);
-            }
-        };
-
-        name = "JRaft-ElectionTimer-" + suffix;
-        electionTimer = new RepeatedTimer(name, options.getElectionTimeoutMs(), timerFactory.getElectionTimer(name)) {
-            @Override
-            protected void onTrigger() {
-                handleElectionTimeout();
-            }
-
-            @Override
-            protected int adjustTimeout(final int timeoutMs) {
-                return randomTimeout(timeoutMs);
-            }
-        };
-
-        name = "JRaft-StepDownTimer-" + suffix;
-        stepDownTimer = new RepeatedTimer(name, options.getElectionTimeoutMs() >> 1, timerFactory.getStepDownTimer(name)) {
-            @Override
-            protected void onTrigger() {
-                handleStepDownTimeout();
-            }
-        };
-
-        name = "JRaft-SnapshotTimer-" + suffix;
-        snapshotTimer = new RepeatedTimer(name, options.getSnapshotIntervalSecs() * 1000, timerFactory.getSnapshotTimer(name)) {
-            private volatile boolean firstSchedule = true;
-
-            @Override
-            protected void onTrigger() {
-                handleSnapshotTimeout();
-            }
-
-            @Override
-            protected int adjustTimeout(final int timeoutMs) {
-                if (!this.firstSchedule) {
-                    return timeoutMs;
-                }
+        // Init timers.
+        initTimers(opts);
 
-                // Randomize the first snapshot trigger timeout
-                this.firstSchedule = false;
-                if (timeoutMs > 0) {
-                    int half = timeoutMs / 2;
-                    return half + ThreadLocalRandom.current().nextInt(half);
-                }
-                else {
-                    return timeoutMs;
-                }
-            }
-        };
+        // Init pools.
+        initPools(opts);
 
         this.configManager = new ConfigurationManager();
 
@@ -1027,7 +953,7 @@ public class NodeImpl implements Node, RaftServerService {
         rgOpts.setRaftRpcClientService(this.rpcClientService);
         rgOpts.setSnapshotStorage(this.snapshotExecutor != null ? this.snapshotExecutor.getSnapshotStorage() : null);
         rgOpts.setRaftOptions(this.raftOptions);
-        rgOpts.setTimerManager(this.timerManager);
+        rgOpts.setTimerManager(this.options.getScheduler());
 
         // Adds metric registry to RPC service.
         this.options.setMetricRegistry(this.metrics.getMetricRegistry());
@@ -1083,6 +1009,146 @@ public class NodeImpl implements Node, RaftServerService {
         return true;
     }
 
+    /**
+     * Validates a required option if shared pools are enabled.
+     *
+     * @param opts Options.
+     * @param name Option name.
+     */
+    private boolean validateOption(NodeOptions opts, String name) {
+        if (opts.isSharedPools())
+            throw new IllegalArgumentException(name + " is required if shared pools are enabled");
+
+        return true;
+    }
+
+
+    /**
+     * Initialize timer pools.
+     * @param opts The options.
+     */
+    private void initTimers(final NodeOptions opts) {
+        if (opts.getScheduler() == null && validateOption(opts, "scheduler"))
+            opts.setScheduler(JRaftUtils.createScheduler(opts));
+
+        String name = "JRaft-VoteTimer";
+        if (opts.getVoteTimer() == null && validateOption(opts, "voteTimer")) {
+            opts.setVoteTimer(JRaftUtils.createTimer(opts, name));
+        }
+
+        this.voteTimer = new RepeatedTimer(name, NodeImpl.this.options.getElectionTimeoutMs(), opts.getVoteTimer()) {
+            @Override
+            protected void onTrigger() {
+                handleVoteTimeout();
+            }
+
+            @Override
+            protected int adjustTimeout(final int timeoutMs) {
+                return randomTimeout(timeoutMs);
+            }
+        };
+
+        name = "JRaft-ElectionTimer";
+        if (opts.getElectionTimer() == null && validateOption(opts, "electionTimer"))
+            opts.setElectionTimer(JRaftUtils.createTimer(opts, name));
+        electionTimer = new RepeatedTimer(name, NodeImpl.this.options.getElectionTimeoutMs(), opts.getElectionTimer()) {
+            @Override
+            protected void onTrigger() {
+                handleElectionTimeout();
+            }
+
+            @Override
+            protected int adjustTimeout(final int timeoutMs) {
+                return randomTimeout(timeoutMs);
+            }
+        };
+
+        name = "JRaft-StepDownTimer";
+        if (opts.getStepDownTimer() == null && validateOption(opts, "stepDownTimer"))
+            opts.setStepDownTimer(JRaftUtils.createTimer(opts, name));
+        stepDownTimer = new RepeatedTimer(name, NodeImpl.this.options.getElectionTimeoutMs() >> 1, opts.getStepDownTimer()) {
+            @Override
+            protected void onTrigger() {
+                handleStepDownTimeout();
+            }
+        };
+
+        name = "JRaft-SnapshotTimer";
+        if (opts.getSnapshotTimer() == null && validateOption(opts, "snapshotTimer"))
+            opts.setSnapshotTimer(JRaftUtils.createTimer(opts, name));
+        snapshotTimer = new RepeatedTimer(name, NodeImpl.this.options.getSnapshotIntervalSecs() * 1000, opts.getSnapshotTimer()) {
+            private volatile boolean firstSchedule = true;
+
+            @Override
+            protected void onTrigger() {
+                handleSnapshotTimeout();
+            }
+
+            @Override
+            protected int adjustTimeout(final int timeoutMs) {
+                if (!this.firstSchedule) {
+                    return timeoutMs;
+                }
+
+                // Randomize the first snapshot trigger timeout
+                this.firstSchedule = false;
+                if (timeoutMs > 0) {
+                    int half = timeoutMs / 2;
+                    return half + ThreadLocalRandom.current().nextInt(half);
+                }
+                else {
+                    return timeoutMs;
+                }
+            }
+        };
+    }
+
+    /**
+     * @param opts Options.
+     */
+    private void initPools(final NodeOptions opts) {
+        if (opts.getCommonExecutor() == null && validateOption(opts, "commonExecutor"))
+            opts.setCommonExecutor(JRaftUtils.createCommonExecutor(opts));
+
+        if (opts.getStripedExecutor() == null && validateOption(opts, "stripedExecutor"))
+            opts.setStripedExecutor(JRaftUtils.createAppendEntriesExecutor(opts));
+
+        if (opts.getClientExecutor() == null && validateOption(opts, "clientExecutor"))
+            opts.setClientExecutor(JRaftUtils.createClientExecutor(opts, opts.getServerName()));
+
+        if (opts.getfSMCallerExecutorDisruptor() == null) {
+            opts.setfSMCallerExecutorDisruptor(new StripedDisruptor<FSMCallerImpl.ApplyTask>(
+                NamedThreadFactory.threadPrefix(opts.getServerName(), "JRaft-FSMCaller-Disruptor"),
+                opts.getRaftOptions().getDisruptorBufferSize(),
+                () -> new FSMCallerImpl.ApplyTask(),
+                opts.getStripes()));
+        }
+
+        if (opts.getNodeApplyDisruptor() == null) {
+            opts.setNodeApplyDisruptor(new StripedDisruptor<NodeImpl.LogEntryAndClosure>(
+                NamedThreadFactory.threadPrefix(opts.getServerName(), "JRaft-NodeImpl-Disruptor"),
+                opts.getRaftOptions().getDisruptorBufferSize(),
+                () -> new NodeImpl.LogEntryAndClosure(),
+                opts.getStripes()));
+        }
+
+        if (opts.getReadOnlyServiceDisruptor() == null) {
+            opts.setReadOnlyServiceDisruptor(new StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent>(
+                NamedThreadFactory.threadPrefix(opts.getServerName(), "JRaft-ReadOnlyService-Disruptor"),
+                opts.getRaftOptions().getDisruptorBufferSize(),
+                () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
+                opts.getStripes()));
+        }
+
+        if (opts.getLogManagerDisruptor() == null) {
+            opts.setLogManagerDisruptor(new StripedDisruptor<LogManagerImpl.StableClosureEvent>(
+                NamedThreadFactory.threadPrefix(opts.getServerName(), "JRaft-LogManager-Disruptor"),
+                opts.getRaftOptions().getDisruptorBufferSize(),
+                () -> new LogManagerImpl.StableClosureEvent(),
+                opts.getStripes()));
+        }
+    }
+
     @OnlyForTest
     void tryElectSelf() {
         this.writeLock.lock();
@@ -1130,22 +1196,25 @@ public class NodeImpl implements Node, RaftServerService {
                 if (peer.equals(this.serverId)) {
                     continue;
                 }
-                if (!this.rpcClientService.connect(peer.getEndpoint())) {
-                    LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
-                    continue;
-                }
-                final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this);
-                done.request = raftOptions.getRaftMessagesFactory()
-                    .requestVoteRequest()
-                    .preVote(false) // It's not a pre-vote request.
-                    .groupId(this.groupId)
-                    .serverId(this.serverId.toString())
-                    .peerId(peer.toString())
-                    .term(this.currTerm)
-                    .lastLogIndex(lastLogId.getIndex())
-                    .lastLogTerm(lastLogId.getTerm())
-                    .build();
-                this.rpcClientService.requestVote(peer.getEndpoint(), done.request, done);
+    
+                rpcClientService.connectAsync(peer.getEndpoint()).thenAccept(ok -> {
+                    if (!ok) {
+                        LOG.warn("Node {} failed to init channel, address={}.", getNodeId(), peer.getEndpoint());
+                        return ;
+                    }
+                    final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this);
+                    done.request = raftOptions.getRaftMessagesFactory()
+                            .requestVoteRequest()
+                            .preVote(false) // It's not a pre-vote request.
+                            .groupId(this.groupId)
+                            .serverId(this.serverId.toString())
+                            .peerId(peer.toString())
+                            .term(this.currTerm)
+                            .lastLogIndex(lastLogId.getIndex())
+                            .lastLogTerm(lastLogId.getTerm())
+                            .build();
+                    this.rpcClientService.requestVote(peer.getEndpoint(), done.request, done);
+                });
             }
 
             this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId);
@@ -2414,17 +2483,13 @@ public class NodeImpl implements Node, RaftServerService {
         return this.options;
     }
 
-    public Scheduler getTimerManager() {
-        return this.timerManager;
-    }
-
     @Override
     public RaftOptions getRaftOptions() {
         return this.raftOptions;
     }
 
     @OnlyForTest
-    long getCurrentTerm() {
+    public long getCurrentTerm() {
         this.readLock.lock();
         try {
             return this.currTerm;
@@ -2641,9 +2706,10 @@ public class NodeImpl implements Node, RaftServerService {
 
         @Override
         public void run(final Status status) {
-            NodeImpl.this.metrics.recordLatency("pre-vote", Utils.monotonicMs() - this.startMs);
+            long latency = Utils.monotonicMs() - this.startMs;
+            NodeImpl.this.metrics.recordLatency("pre-vote", latency);
             if (!status.isOk()) {
-                LOG.warn("Node {} PreVote to {} error: {}.", getNodeId(), this.peer, status);
+                LOG.warn("Node {} PreVote to {} latency={} error: {}.", getNodeId(), this.peer, status, latency);
             }
             else {
                 handlePreVoteResponse(this.peer, this.term, getResponse());
@@ -2687,22 +2753,25 @@ public class NodeImpl implements Node, RaftServerService {
                 if (peer.equals(this.serverId)) {
                     continue;
                 }
-                if (!this.rpcClientService.connect(peer.getEndpoint())) {
-                    LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
-                    continue;
-                }
-                final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
-                done.request = raftOptions.getRaftMessagesFactory()
-                    .requestVoteRequest()
-                    .preVote(true) // it's a pre-vote request.
-                    .groupId(this.groupId)
-                    .serverId(this.serverId.toString())
-                    .peerId(peer.toString())
-                    .term(this.currTerm + 1) // next term
-                    .lastLogIndex(lastLogId.getIndex())
-                    .lastLogTerm(lastLogId.getTerm())
-                    .build();
-                this.rpcClientService.preVote(peer.getEndpoint(), done.request, done);
+    
+                rpcClientService.connectAsync(peer.getEndpoint()).thenAccept(ok -> {
+                    if (!ok) {
+                        LOG.warn("Node {} failed to init channel, address={}.", getNodeId(), peer.getEndpoint());
+                        return;
+                    }
+                    final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
+                    done.request = raftOptions.getRaftMessagesFactory()
+                            .requestVoteRequest()
+                            .preVote(true) // it's a pre-vote request.
+                            .groupId(this.groupId)
+                            .serverId(this.serverId.toString())
+                            .peerId(peer.toString())
+                            .term(this.currTerm + 1) // next term
+                            .lastLogIndex(lastLogId.getIndex())
+                            .lastLogTerm(lastLogId.getTerm())
+                            .build();
+                    this.rpcClientService.preVote(peer.getEndpoint(), done.request, done);
+                });
             }
             this.prevVoteCtx.grant(this.serverId);
             if (this.prevVoteCtx.isGranted()) {
@@ -2761,7 +2830,6 @@ public class NodeImpl implements Node, RaftServerService {
 
     @Override
     public void shutdown(final Closure done) {
-        List<RepeatedTimer> timers = null;
         this.writeLock.lock();
         try {
             LOG.info("Node {} shutdown, currTerm={} state={}.", getNodeId(), this.currTerm, this.state);
@@ -2773,8 +2841,8 @@ public class NodeImpl implements Node, RaftServerService {
                         new Status(RaftError.ESHUTDOWN, "Raft node is going to quit."));
                 }
                 this.state = State.STATE_SHUTTING;
-                // Stop all timers
-                timers = stopAllTimers();
+                // Stop all pending timer callbacks.
+                stopAllTimers();
                 if (this.readOnlyService != null) {
                     this.readOnlyService.shutdown();
                 }
@@ -2806,9 +2874,6 @@ public class NodeImpl implements Node, RaftServerService {
                             event.shutdownLatch = latch;
                         }));
                 }
-                if (!sharedTimerManager && this.timerManager != null) {
-                    this.timerManager.shutdown();
-                }
             }
 
             if (this.state != State.STATE_SHUTDOWN) {
@@ -2827,11 +2892,6 @@ public class NodeImpl implements Node, RaftServerService {
         }
         finally {
             this.writeLock.unlock();
-
-            // Destroy all timers out of lock
-            if (timers != null) {
-                destroyAllTimers(timers);
-            }
         }
     }
 
@@ -2885,6 +2945,58 @@ public class NodeImpl implements Node, RaftServerService {
         if (this.fsmCaller != null) {
             this.fsmCaller.join();
         }
+
+        // Stop and reset non shared pools.
+        NodeOptions opts = getOptions();
+
+        if (opts.getScheduler() != null && !opts.isSharedPools()) {
+            opts.getScheduler().shutdown();
+            opts.setScheduler(null);
+        }
+        if (opts.getElectionTimer() != null && !opts.isSharedPools()) {
+            opts.getElectionTimer().stop();
+            opts.setElectionTimer(null);
+        }
+        if (opts.getVoteTimer() != null && !opts.isSharedPools()) {
+            opts.getVoteTimer().stop();
+            opts.setVoteTimer(null);
+        }
+        if (opts.getStepDownTimer() != null && !opts.isSharedPools()) {
+            opts.getStepDownTimer().stop();
+            opts.setStepDownTimer(null);
+        }
+        if (opts.getSnapshotTimer() != null && !opts.isSharedPools()) {
+            opts.getSnapshotTimer().stop();
+            opts.setSnapshotTimer(null);
+        }
+        if (opts.getCommonExecutor() != null && !opts.isSharedPools()) {
+            ExecutorServiceHelper.shutdownAndAwaitTermination(opts.getCommonExecutor());
+            opts.setCommonExecutor(null);
+        }
+        if (opts.getStripedExecutor() != null && !opts.isSharedPools()) {
+            opts.getStripedExecutor().shutdownGracefully();
+            opts.setStripedExecutor(null);
+        }
+        if (opts.getClientExecutor() != null && !opts.isSharedPools()) {
+            ExecutorServiceHelper.shutdownAndAwaitTermination(opts.getClientExecutor());
+            opts.setClientExecutor(null);
+        }
+        if (opts.getfSMCallerExecutorDisruptor() != null && !opts.isSharedPools()) {
+            opts.getfSMCallerExecutorDisruptor().shutdown();
+            opts.setfSMCallerExecutorDisruptor(null);
+        }
+        if (opts.getNodeApplyDisruptor() != null && !opts.isSharedPools()) {
+            opts.getNodeApplyDisruptor().shutdown();
+            opts.setNodeApplyDisruptor(null);
+        }
+        if (opts.getReadOnlyServiceDisruptor() != null && !opts.isSharedPools()) {
+            opts.getReadOnlyServiceDisruptor().shutdown();
+            opts.setReadOnlyServiceDisruptor(null);
+        }
+        if (opts.getLogManagerDisruptor() != null && !opts.isSharedPools()) {
+            opts.getLogManagerDisruptor().shutdown();
+            opts.setLogManagerDisruptor(null);
+        }
     }
 
     private static class StopTransferArg {
@@ -3235,7 +3347,7 @@ public class NodeImpl implements Node, RaftServerService {
             LOG.info("Node {} starts to transfer leadership to peer {}.", getNodeId(), peer);
             final StopTransferArg stopArg = new StopTransferArg(this, this.currTerm, peerId);
             this.stopTransferArg = stopArg;
-            this.transferTimer = this.timerManager.schedule(() -> onTransferTimeout(stopArg),
+            this.transferTimer = this.getOptions().getScheduler().schedule(() -> onTransferTimeout(stopArg),
                 this.options.getElectionTimeoutMs(), TimeUnit.MILLISECONDS);
 
         }
@@ -3288,9 +3400,11 @@ public class NodeImpl implements Node, RaftServerService {
             // Parallelize response and election
             done.sendResponse(resp);
             doUnlock = false;
+    
+            LOG.info("Node {} received TimeoutNowRequest from {}, term={} and starts voting.", getNodeId(), request.serverId(),
+                    savedTerm);
+            
             electSelf();
-            LOG.info("Node {} received TimeoutNowRequest from {}, term={}.", getNodeId(), request.serverId(),
-                savedTerm);
         }
         finally {
             if (doUnlock) {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
index 31e6f56..ae309ba 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
@@ -266,7 +266,7 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
         this.fsmCaller.addLastAppliedLogIndexListener(this);
 
         // start scanner.
-        this.node.getTimerManager().scheduleAtFixedRate(() -> onApplied(this.fsmCaller.getLastAppliedIndex()),
+        this.node.getOptions().getScheduler().scheduleAtFixedRate(() -> onApplied(this.fsmCaller.getLastAppliedIndex()),
             this.raftOptions.getMaxElectionDelayMs(), this.raftOptions.getMaxElectionDelayMs(), TimeUnit.MILLISECONDS);
         return true;
     }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
index 14ef620..3d0c045 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
@@ -1026,7 +1026,7 @@ public class Replicator implements ThreadId.OnError {
                 this.catchUpClosure.getStatus().setError(code, RaftError.describeCode(code));
             }
             if (this.catchUpClosure.hasTimer()) {
-                if (!beforeDestroy && !this.catchUpClosure.getTimer().cancel(true)) {
+                if (!beforeDestroy && !this.catchUpClosure.getTimer().cancel(false)) { // Avoid interrupting a thread in the pool.
                     // There's running timer task, let timer task trigger
                     // on_caught_up to void ABA problem
                     return;
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/StateMachineAdapter.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/StateMachineAdapter.java
index 8a21fde..1fc3055 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/StateMachineAdapter.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/StateMachineAdapter.java
@@ -31,6 +31,8 @@ import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
  * State machine adapter that implements all methods with default behavior except {@link #onApply(Iterator)}.
  */
 public abstract class StateMachineAdapter implements StateMachine {
+    protected volatile long leaderTerm = -1;
+
     /** The logger */
     private static final IgniteLogger LOG = IgniteLogger.forClass(StateMachineAdapter.class);
 
@@ -51,8 +53,13 @@ public abstract class StateMachineAdapter implements StateMachine {
         return false;
     }
 
+    public long getLeaderTerm() {
+        return this.leaderTerm;
+    }
+
     @Override
     public void onLeaderStart(final long term) {
+        this.leaderTerm = term;
         LOG.info("onLeaderStart: term={}.", term);
     }
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index 632446e..5e61cc9 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -23,13 +23,19 @@ import org.apache.ignite.raft.jraft.StateMachine;
 import org.apache.ignite.raft.jraft.conf.Configuration;
 import org.apache.ignite.raft.jraft.core.DefaultJRaftServiceFactory;
 import org.apache.ignite.raft.jraft.core.ElectionPriority;
+import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
 import org.apache.ignite.raft.jraft.core.Replicator;
 import org.apache.ignite.raft.jraft.core.Scheduler;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
+import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
 import org.apache.ignite.raft.jraft.util.Copiable;
 import org.apache.ignite.raft.jraft.util.StringUtils;
 import org.apache.ignite.raft.jraft.util.Utils;
 import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
+import org.apache.ignite.raft.jraft.util.timer.Timer;
 
 /**
  * Node options.
@@ -40,8 +46,8 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
 
     // A follower would become a candidate if it doesn't receive any message
     // from the leader in |election_timeout_ms| milliseconds
-    // Default: 1000 (1s)
-    private int electionTimeoutMs = 1000; // follower to candidate timeout
+    // Default: 1200 (1.2s)
+    private int electionTimeoutMs = 1200; // follower to candidate timeout
 
     // One node's local priority value would be set to | electionPriority |
     // value when it starts up.If this value is set to 0,the node will never be a leader.
@@ -119,11 +125,6 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     private boolean disableCli = false;
 
     /**
-     * Whether use global timer pool, if true, the {@code timerPoolSize} will be invalid.
-     */
-    private boolean sharedTimerPool = false;
-
-    /**
      * Timer manager thread pool size
      */
     private int timerPoolSize = Math.min(Utils.cpus() * 3, 20);
@@ -154,56 +155,83 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     private SnapshotThrottle snapshotThrottle;
 
     /**
-     * Whether use global election timer TODO asch remove this https://issues.apache.org/jira/browse/IGNITE-14832
+     * Custom service factory.
      */
-    private boolean sharedElectionTimer = false;
+    private JRaftServiceFactory serviceFactory = new DefaultJRaftServiceFactory();
 
     /**
-     * Whether use global vote timer TODO asch remove this https://issues.apache.org/jira/browse/IGNITE-14832
+     * Callbacks for replicator events.
      */
-    private boolean sharedVoteTimer = false;
+    private List<Replicator.ReplicatorStateListener> replicationStateListeners;
 
     /**
-     * Whether use global step down timer
+     * The common executor for short running tasks.
      */
-    private boolean sharedStepDownTimer = false;
+    private ExecutorService commonExecutor;
 
     /**
-     * Whether use global snapshot timer
+     * Striped executor for processing AppendEntries request/reponse.
      */
-    private boolean sharedSnapshotTimer = false;
+    private FixedThreadsExecutorGroup stripedExecutor;
 
     /**
-     * Custom service factory.
+     * The scheduler to execute delayed jobs.
      */
-    private JRaftServiceFactory serviceFactory = new DefaultJRaftServiceFactory();
+    private Scheduler scheduler;
 
     /**
-     *
+     * The election timer.
      */
-    private List<Replicator.ReplicatorStateListener> replicationStateListeners;
+    private Timer electionTimer;
 
     /**
-     * The common executor for short running tasks.
+     * The election timer.
      */
-    private ExecutorService commonExecutor;
+    private Timer voteTimer;
 
     /**
-     * Striped executor for processing AppendEntries request/reponse.
+     * The election timer.
      */
-    private FixedThreadsExecutorGroup stripedExecutor;
+    private Timer snapshotTimer;
 
     /**
-     * The scheduler to execute delayed jobs.
+     * The election timer.
      */
-    private Scheduler scheduler;
+    private Timer stepDownTimer;
 
-    /** Server name. */
+    /**
+     * Server name.
+     */
     private String serverName;
 
-    /** Amount of Disruptors that will handle the RAFT server. */
+    /**
+     * Striped disruptor for FSMCaller service. The queue serves of an Append entry requests in the RAFT state machine.
+     */
+    private StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor;
+
+    /**
+     * Striped disruptor for Node apply service.
+     */
+    private StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeApplyDisruptor;
+
+    /**
+     * Striped disruptor for Read only service.
+     */
+    private StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor;
+
+    /**
+     * Striped disruptor for Log manager service.
+     */
+    private StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
+
+    /**
+     * Amount of Disruptors that will handle the RAFT server.
+     */
     private int stripes = DEFAULT_STRIPES;
 
+    /** */
+    private boolean sharedPools = false;
+
     public NodeOptions() {
         raftOptions.setRaftMessagesFactory(getRaftMessagesFactory());
     }
@@ -223,6 +251,24 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     }
 
     /**
+     * Returns {@code true} if shared pools mode is in use.
+     *
+     * <p>In this mode thread pools are passed in the node options and node doesn't attempt to create/destroy any.
+     *
+     * @return {@code true} if shared pools mode is in use.
+     */
+    public boolean isSharedPools() {
+        return sharedPools;
+    }
+
+    /**
+     * @param sharedPools {code true} to enable shared pools mode.
+     */
+    public void setSharedPools(boolean sharedPools) {
+        this.sharedPools = sharedPools;
+    }
+
+    /**
      * The rpc client.
      */
     public JRaftServiceFactory getServiceFactory() {
@@ -278,14 +324,6 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
         this.commonThreadPollSize = commonThreadPollSize;
     }
 
-    public boolean isSharedTimerPool() {
-        return sharedTimerPool;
-    }
-
-    public void setSharedTimerPool(boolean sharedTimerPool) {
-        this.sharedTimerPool = sharedTimerPool;
-    }
-
     public int getTimerPoolSize() {
         return this.timerPoolSize;
     }
@@ -435,60 +473,60 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
         this.disableCli = disableCli;
     }
 
-    public boolean isSharedElectionTimer() {
-        return sharedElectionTimer;
+    public void setCommonExecutor(ExecutorService commonExecutor) {
+        this.commonExecutor = commonExecutor;
     }
 
-    public void setSharedElectionTimer(boolean sharedElectionTimer) {
-        this.sharedElectionTimer = sharedElectionTimer;
+    public ExecutorService getCommonExecutor() {
+        return commonExecutor;
     }
 
-    public boolean isSharedVoteTimer() {
-        return sharedVoteTimer;
+    public FixedThreadsExecutorGroup getStripedExecutor() {
+        return stripedExecutor;
     }
 
-    public void setSharedVoteTimer(boolean sharedVoteTimer) {
-        this.sharedVoteTimer = sharedVoteTimer;
+    public void setStripedExecutor(FixedThreadsExecutorGroup stripedExecutor) {
+        this.stripedExecutor = stripedExecutor;
     }
 
-    public boolean isSharedStepDownTimer() {
-        return sharedStepDownTimer;
+    public Scheduler getScheduler() {
+        return scheduler;
     }
 
-    public void setSharedStepDownTimer(boolean sharedStepDownTimer) {
-        this.sharedStepDownTimer = sharedStepDownTimer;
+    public void setScheduler(Scheduler scheduler) {
+        this.scheduler = scheduler;
     }
 
-    public boolean isSharedSnapshotTimer() {
-        return sharedSnapshotTimer;
+    public Timer getElectionTimer() {
+        return electionTimer;
     }
 
-    public void setSharedSnapshotTimer(boolean sharedSnapshotTimer) {
-        this.sharedSnapshotTimer = sharedSnapshotTimer;
+    public void setElectionTimer(Timer electionTimer) {
+        this.electionTimer = electionTimer;
     }
 
-    public void setCommonExecutor(ExecutorService commonExecutor) {
-        this.commonExecutor = commonExecutor;
+    public Timer getVoteTimer() {
+        return voteTimer;
     }
 
-    public ExecutorService getCommonExecutor() {
-        return commonExecutor;
+    public void setVoteTimer(Timer voteTimer) {
+        this.voteTimer = voteTimer;
     }
 
-    public FixedThreadsExecutorGroup getStripedExecutor() {
-        return stripedExecutor;
+    public Timer getSnapshotTimer() {
+        return snapshotTimer;
     }
 
-    public void setStripedExecutor(FixedThreadsExecutorGroup stripedExecutor) {
-        this.stripedExecutor = stripedExecutor;
+    public void setSnapshotTimer(Timer snapshotTimer) {
+        this.snapshotTimer = snapshotTimer;
     }
 
-    public Scheduler getScheduler() {
-        return scheduler;
+    public Timer getStepDownTimer() {
+        return stepDownTimer;
     }
 
-    public void setScheduler(Scheduler scheduler) {
-        this.scheduler = scheduler;
+    public void setStepDownTimer(Timer stepDownTimer) {
+        this.stepDownTimer = stepDownTimer;
     }
 
     public String getServerName() {
@@ -499,6 +537,38 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
         this.serverName = serverName;
     }
 
+    public StripedDisruptor<FSMCallerImpl.ApplyTask> getfSMCallerExecutorDisruptor() {
+        return fSMCallerExecutorDisruptor;
+    }
+
+    public void setfSMCallerExecutorDisruptor(StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor) {
+        this.fSMCallerExecutorDisruptor = fSMCallerExecutorDisruptor;
+    }
+
+    public StripedDisruptor<NodeImpl.LogEntryAndClosure> getNodeApplyDisruptor() {
+        return nodeApplyDisruptor;
+    }
+
+    public void setNodeApplyDisruptor(StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeApplyDisruptor) {
+        this.nodeApplyDisruptor = nodeApplyDisruptor;
+    }
+
+    public StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> getReadOnlyServiceDisruptor() {
+        return readOnlyServiceDisruptor;
+    }
+
+    public void setReadOnlyServiceDisruptor(StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor) {
+        this.readOnlyServiceDisruptor = readOnlyServiceDisruptor;
+    }
+
+    public StripedDisruptor<LogManagerImpl.StableClosureEvent> getLogManagerDisruptor() {
+        return logManagerDisruptor;
+    }
+
+    public void setLogManagerDisruptor(StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor) {
+        this.logManagerDisruptor = logManagerDisruptor;
+    }
+
     @Override
     public NodeOptions copy() {
         final NodeOptions nodeOptions = new NodeOptions();
@@ -510,17 +580,12 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
         nodeOptions.setCatchupMargin(this.catchupMargin);
         nodeOptions.setFilterBeforeCopyRemote(this.filterBeforeCopyRemote);
         nodeOptions.setDisableCli(this.disableCli);
-        nodeOptions.setSharedTimerPool(this.sharedTimerPool);
         nodeOptions.setTimerPoolSize(this.timerPoolSize);
         nodeOptions.setCliRpcThreadPoolSize(this.cliRpcThreadPoolSize);
         nodeOptions.setRaftRpcThreadPoolSize(this.raftRpcThreadPoolSize);
         nodeOptions.setCommonThreadPollSize(this.commonThreadPollSize);
         nodeOptions.setEnableMetrics(this.enableMetrics);
         nodeOptions.setRaftOptions(this.raftOptions.copy());
-        nodeOptions.setSharedElectionTimer(this.sharedElectionTimer);
-        nodeOptions.setSharedVoteTimer(this.sharedVoteTimer);
-        nodeOptions.setSharedStepDownTimer(this.sharedStepDownTimer);
-        nodeOptions.setSharedSnapshotTimer(this.sharedSnapshotTimer);
         nodeOptions.setReplicationStateListeners(this.replicationStateListeners);
         nodeOptions.setCommonExecutor(this.getCommonExecutor());
         nodeOptions.setStripedExecutor(this.getStripedExecutor());
@@ -531,6 +596,13 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
         nodeOptions.setfSMCallerExecutorDisruptor(this.getfSMCallerExecutorDisruptor());
         nodeOptions.setReadOnlyServiceDisruptor(this.getReadOnlyServiceDisruptor());
         nodeOptions.setLogManagerDisruptor(this.getLogManagerDisruptor());
+        nodeOptions.setElectionTimer(this.getElectionTimer());
+        nodeOptions.setVoteTimer(this.getVoteTimer());
+        nodeOptions.setSnapshotTimer(this.getSnapshotTimer());
+        nodeOptions.setStepDownTimer(this.getStepDownTimer());
+        nodeOptions.setSharedPools(this.isSharedPools());
+        nodeOptions.setRpcDefaultTimeout(this.getRpcDefaultTimeout());
+        nodeOptions.setRpcConnectTimeoutMs(this.getRpcConnectTimeoutMs());
 
         return nodeOptions;
     }
@@ -543,11 +615,9 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
             + snapshotLogIndexMargin + ", catchupMargin=" + catchupMargin + ", initialConf=" + initialConf
             + ", fsm=" + fsm + ", logUri='" + logUri + '\'' + ", raftMetaUri='" + raftMetaUri + '\''
             + ", snapshotUri='" + snapshotUri + '\'' + ", filterBeforeCopyRemote=" + filterBeforeCopyRemote
-            + ", disableCli=" + disableCli + ", sharedTimerPool=" + sharedTimerPool + ", timerPoolSize="
+            + ", disableCli=" + disableCli + ", timerPoolSize="
             + timerPoolSize + ", cliRpcThreadPoolSize=" + cliRpcThreadPoolSize + ", raftRpcThreadPoolSize="
             + raftRpcThreadPoolSize + ", enableMetrics=" + enableMetrics + ", snapshotThrottle=" + snapshotThrottle
-            + ", sharedElectionTimer=" + sharedElectionTimer + ", sharedVoteTimer=" + sharedVoteTimer
-            + ", sharedStepDownTimer=" + sharedStepDownTimer + ", sharedSnapshotTimer=" + sharedSnapshotTimer
             + ", serviceFactory=" + serviceFactory + ", raftOptions=" + raftOptions + "} " + super.toString();
     }
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java
index 7452233..9807120 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java
@@ -16,42 +16,41 @@
  */
 package org.apache.ignite.raft.jraft.option;
 
-import com.codahale.metrics.MetricRegistry;
 import java.util.concurrent.ExecutorService;
+import com.codahale.metrics.MetricRegistry;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
-import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
-import org.apache.ignite.raft.jraft.core.NodeImpl;
-import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
-import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.rpc.RpcClient;
-import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
 
 public class RpcOptions {
     /** Raft message factory. */
     private RaftMessagesFactory raftMessagesFactory = new RaftMessagesFactory();
 
     /**
-     * Rpc handshake timeout in milliseconds Default: 2000(1s)
+     * Rpc connect timeout in milliseconds.
+     * Default: 1200 (1.2s)
      */
-    private int rpcConnectTimeoutMs = 2000; // TODO asch rename to handshake timeout IGNITE-14832.
+    private int rpcConnectTimeoutMs = 1200;
 
     /**
-     * RPC request default timeout in milliseconds Default: 5000(5s)
+     * RPC request default timeout in milliseconds.
+     * Default: 5000 (5s)
      */
     private int rpcDefaultTimeout = 5000;
 
     /**
-     * Install snapshot RPC request default timeout in milliseconds Default: 5 * 60 * 1000(5min)
+     * Install snapshot RPC request default timeout in milliseconds.
+     * Default: 5m
      */
     private int rpcInstallSnapshotTimeout = 5 * 60 * 1000;
 
     /**
-     * RPC process thread pool size Default: 80
+     * RPC process thread pool size.
+     * Default: 80
      */
     private int rpcProcessorThreadPoolSize = 80;
 
     /**
-     * Whether to enable checksum for RPC. Default: false
+     * Whether to enable checksum for RPC.
      */
     private boolean enableRpcChecksum = false;
 
@@ -65,50 +64,6 @@ public class RpcOptions {
      */
     private ExecutorService clientExecutor;
 
-    /** Striped disruptor for FSMCaller service. The queue serves of an Append entry requests in the RAFT state machine. */
-    private StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor;
-
-    /** Striped disruptor for Node apply service. */
-    private StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeApplyDisruptor;
-
-    /** Striped disruptor for Read only service. */
-    private StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor;
-
-    /** Striped disruptor for Log manager service. */
-    private StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
-
-    public StripedDisruptor<FSMCallerImpl.ApplyTask> getfSMCallerExecutorDisruptor() {
-        return fSMCallerExecutorDisruptor;
-    }
-
-    public void setfSMCallerExecutorDisruptor(StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor) {
-        this.fSMCallerExecutorDisruptor = fSMCallerExecutorDisruptor;
-    }
-
-    public StripedDisruptor<NodeImpl.LogEntryAndClosure> getNodeApplyDisruptor() {
-        return nodeApplyDisruptor;
-    }
-
-    public void setNodeApplyDisruptor(StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeApplyDisruptor) {
-        this.nodeApplyDisruptor = nodeApplyDisruptor;
-    }
-
-    public StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> getReadOnlyServiceDisruptor() {
-        return readOnlyServiceDisruptor;
-    }
-
-    public void setReadOnlyServiceDisruptor(StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor) {
-        this.readOnlyServiceDisruptor = readOnlyServiceDisruptor;
-    }
-
-    public StripedDisruptor<LogManagerImpl.StableClosureEvent> getLogManagerDisruptor() {
-        return logManagerDisruptor;
-    }
-
-    public void setLogManagerDisruptor(StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor) {
-        this.logManagerDisruptor = logManagerDisruptor;
-    }
-
     /**
      * Metric registry for RPC services, user should not use this field.
      */
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ClientService.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ClientService.java
index 113de32..fe28df4 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ClientService.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ClientService.java
@@ -16,7 +16,7 @@
  */
 package org.apache.ignite.raft.jraft.rpc;
 
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.raft.jraft.Lifecycle;
 import org.apache.ignite.raft.jraft.option.RpcOptions;
 import org.apache.ignite.raft.jraft.util.Endpoint;
@@ -26,12 +26,20 @@ import org.apache.ignite.raft.jraft.util.Endpoint;
  */
 public interface ClientService extends Lifecycle<RpcOptions> {
     /**
-     * Connect to endpoint, returns true when success. TODO asch rename to isAlive IGNITE-14832.
+     * Connect to endpoint, returns true when success. TODO asch it seems we don't need it IGNITE-14832.
      *
      * @param endpoint server address
      * @return true on connect success
      */
     boolean connect(final Endpoint endpoint);
+    
+    /**
+     * Connect to endpoint asynchronously, returns true when success.
+     *
+     * @param endpoint server address
+     * @return The future with the result.
+     */
+    CompletableFuture<Boolean> connectAsync(final Endpoint endpoint);
 
     /**
      * Send a requests and waits for response with callback, returns the request future.
@@ -42,6 +50,6 @@ public interface ClientService extends Lifecycle<RpcOptions> {
      * @param timeoutMs timeout millis
      * @return a future with operation result
      */
-    <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
+    <T extends Message> CompletableFuture<Message> invokeWithDone(final Endpoint endpoint, final Message request,
         final RpcResponseClosure<T> done, final int timeoutMs);
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
index b0c0240..792f621 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
@@ -18,10 +18,10 @@ package org.apache.ignite.raft.jraft.rpc.impl;
 
 import java.net.ConnectException;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.TopologyEventHandler;
@@ -106,69 +106,71 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
 
     @Override
     public boolean connect(final Endpoint endpoint) {
+        try {
+            return connectAsync(endpoint).get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+    
+            LOG.error("Interrupted while connecting to {}, exception: {}.", endpoint, e.getMessage());
+        } catch (ExecutionException e) {
+            LOG.error("Fail to connect {}, exception: {}.", endpoint, e.getMessage());
+        }
+        
+        return false;
+    }
+    
+    @Override
+    public CompletableFuture<Boolean> connectAsync(Endpoint endpoint) {
         final RpcClient rc = this.rpcClient;
         if (rc == null) {
             throw new IllegalStateException("Client service is uninitialized.");
         }
-
+    
         // Remote node is alive and pinged, safe to continue.
-        if (readyAddresses.contains(endpoint.toString()))
-            return true;
-
-        // Remote node must be pinged to make sure listeners are set.
-        synchronized (this) {
-            if (readyAddresses.contains(endpoint.toString()))
+        if (readyAddresses.contains(endpoint.toString())) {
+            return CompletableFuture.completedFuture(true);
+        }
+    
+        final RpcRequests.PingRequest req = rpcOptions.getRaftMessagesFactory()
+                .pingRequest()
+                .sendTimestamp(System.currentTimeMillis())
+                .build();
+    
+        CompletableFuture<Message> fut =
+                invokeWithDone(endpoint, req, null, null, rpcOptions.getRpcConnectTimeoutMs(), rpcExecutor);
+    
+        return fut.thenApply(msg -> {
+            ErrorResponse resp = (ErrorResponse) msg;
+        
+            if (resp != null && resp.errorCode() == 0) {
+                readyAddresses.add(endpoint.toString());
+            
                 return true;
-
-            try {
-                final RpcRequests.PingRequest req = rpcOptions.getRaftMessagesFactory()
-                    .pingRequest()
-                    .sendTimestamp(System.currentTimeMillis())
-                    .build();
-
-                Future<Message> fut =
-                    invokeWithDone(endpoint, req, null, null, rpcOptions.getRpcConnectTimeoutMs(), rpcExecutor);
-
-                final ErrorResponse resp = (ErrorResponse) fut.get(); // Future will be certainly terminated by timeout.
-
-                if (resp != null && resp.errorCode() == 0) {
-                    readyAddresses.add(endpoint.toString());
-
-                    return true;
-                }
-                else
-                    return false;
-            }
-            catch (final InterruptedException e) {
-                Thread.currentThread().interrupt();
+            } else {
+                return false;
             }
-            catch (final ExecutionException e) {
-                LOG.error("Fail to connect {}, exception: {}.", endpoint, e.getMessage());
-            }
-        }
-
-        return false;
+        });
     }
-
+    
     @Override
-    public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
+    public <T extends Message> CompletableFuture<Message> invokeWithDone(final Endpoint endpoint, final Message request,
         final RpcResponseClosure<T> done, final int timeoutMs) {
         return invokeWithDone(endpoint, request, done, timeoutMs, this.rpcExecutor);
     }
 
-    public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
+    public <T extends Message> CompletableFuture<Message> invokeWithDone(final Endpoint endpoint, final Message request,
         final RpcResponseClosure<T> done, final int timeoutMs,
         final Executor rpcExecutor) {
         return invokeWithDone(endpoint, request, null, done, timeoutMs, rpcExecutor);
     }
 
-    public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
+    public <T extends Message> CompletableFuture<Message> invokeWithDone(final Endpoint endpoint, final Message request,
         final InvokeContext ctx,
         final RpcResponseClosure<T> done, final int timeoutMs) {
         return invokeWithDone(endpoint, request, ctx, done, timeoutMs, this.rpcExecutor);
     }
 
-    public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
+    public <T extends Message> CompletableFuture<Message> invokeWithDone(final Endpoint endpoint, final Message request,
         final InvokeContext ctx,
         final RpcResponseClosure<T> done, final int timeoutMs,
         final Executor rpcExecutor) {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
index dee12eb..1562188 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
@@ -24,6 +24,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiPredicate;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NetworkMessage;
@@ -41,6 +43,8 @@ import org.apache.ignite.raft.jraft.util.Utils;
 import static org.apache.ignite.raft.jraft.JRaftUtils.addressFromEndpoint;
 
 public class IgniteRpcClient implements RpcClientEx {
+    private static final IgniteLogger LOG = IgniteLogger.forClass(IgniteRpcClient.class);
+    
     private volatile BiPredicate<Object, String> recordPred;
 
     private BiPredicate<Object, String> blockPred;
@@ -108,13 +112,17 @@ public class IgniteRpcClient implements RpcClientEx {
 
         synchronized (this) {
             if (blockPred != null && blockPred.test(request, endpoint.toString())) {
-                blockedMsgs.add(new Object[] {
-                    request,
-                    endpoint.toString(),
-                    fut.hashCode(),
-                    System.currentTimeMillis(),
-                    (Runnable)() -> send(endpoint, request, fut, timeoutMs)
-                });
+                Object[] msgData = {
+                        request,
+                        endpoint.toString(),
+                        fut.hashCode(),
+                        System.currentTimeMillis(),
+                        (Runnable) () -> send(endpoint, request, fut, timeoutMs)
+                };
+                
+                blockedMsgs.add(msgData);
+    
+                LOG.info("Blocked message to={} id={} msg={}", endpoint.toString(), msgData[2], S.toString(request));
 
                 return fut;
             }
@@ -163,7 +171,9 @@ public class IgniteRpcClient implements RpcClientEx {
 
         for (Object[] msg : msgs) {
             Runnable r = (Runnable) msg[4];
-
+    
+            LOG.info("Unblocked message to={} id={} msg={}", msg[1], msg[2], S.toString(msg[0]));
+            
             r.run();
         }
     }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 2a39de3..ac2c6a7 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -22,6 +22,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.NetworkMessageHandler;
 import org.apache.ignite.raft.jraft.RaftMessageGroup;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -56,6 +58,8 @@ import org.apache.ignite.raft.jraft.rpc.impl.core.TimeoutNowRequestProcessor;
  * TODO https://issues.apache.org/jira/browse/IGNITE-14519 Unsubscribe on shutdown
  */
 public class IgniteRpcServer implements RpcServer<Void> {
+    private static final IgniteLogger LOG = IgniteLogger.forClass(IgniteRpcServer.class);
+
     private final ClusterService service;
 
     private final NodeManager nodeManager;
@@ -91,7 +95,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
         registerProcessor(new GetFileRequestProcessor(rpcExecutor, raftMessagesFactory));
         registerProcessor(new InstallSnapshotRequestProcessor(rpcExecutor, raftMessagesFactory));
         registerProcessor(new RequestVoteRequestProcessor(rpcExecutor, raftMessagesFactory));
-        registerProcessor(new PingRequestProcessor(rpcExecutor, raftMessagesFactory)); // TODO asch this should go last.
+        registerProcessor(new PingRequestProcessor(rpcExecutor, raftMessagesFactory));
         registerProcessor(new TimeoutNowRequestProcessor(rpcExecutor, raftMessagesFactory));
         registerProcessor(new ReadIndexRequestProcessor(rpcExecutor, raftMessagesFactory));
         // raft native cli service
@@ -111,6 +115,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
 
         var messageHandler = new RpcMessageHandler();
 
+        // Add the handler after all processors are set up.
         service.messagingService().addMessageHandler(RaftMessageGroup.class, messageHandler);
 
         service.topologyService().addEventHandler(new TopologyEventHandler() {
@@ -185,7 +190,8 @@ public class IgniteRpcServer implements RpcServer<Void> {
                     finalPrc.handleRequest(context, message);
                 });
             } catch (RejectedExecutionException e) {
-                // Node is stopping.
+                // The rejection is ok if an executor has been stopped, otherwise it shouldn't happen.
+                LOG.warn("A request execution was rejected [sender={} req={} reason={}]", senderAddr, S.toString(message), e.getMessage());
             }
         }
     }
@@ -217,5 +223,6 @@ public class IgniteRpcServer implements RpcServer<Void> {
 
     /** {@inheritDoc} */
     @Override public void shutdown() {
+        // Should deregister listeners.
     }
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/PingRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/PingRequestProcessor.java
index a653dc2..6bd0c6f 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/PingRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/PingRequestProcessor.java
@@ -17,6 +17,7 @@
 package org.apache.ignite.raft.jraft.rpc.impl;
 
 import java.util.concurrent.Executor;
+import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
 import org.apache.ignite.raft.jraft.rpc.RpcContext;
@@ -27,6 +28,8 @@ import org.apache.ignite.raft.jraft.rpc.RpcRequests.PingRequest;
  * Ping request processor.
  */
 public class PingRequestProcessor implements RpcProcessor<PingRequest> {
+    private static final IgniteLogger LOG = IgniteLogger.forClass(PingRequestProcessor.class);
+    
     /** The executor */
     private final Executor executor;
 
@@ -44,6 +47,8 @@ public class PingRequestProcessor implements RpcProcessor<PingRequest> {
     /** {@inheritDoc} */
     @Override
     public void handleRequest(final RpcContext rpcCtx, final PingRequest request) {
+        LOG.debug("Pinged from={}", rpcCtx.getRemoteAddress());
+        
         rpcCtx.sendResponse(RaftRpcFactory.DEFAULT.newResponse(msgFactory, 0, "OK"));
     }
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
index 4593058..1feac60 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
@@ -912,7 +912,7 @@ public class LogManagerImpl implements LogManager {
             this.readLock.unlock();
         }
         try {
-            c.await();
+            c.await(); // TODO FIXME asch https://issues.apache.org/jira/browse/IGNITE-15974
         }
         catch (final InterruptedException e) {
             Thread.currentThread().interrupt();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
index 280c890..4097417 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
@@ -651,7 +651,7 @@ public class SnapshotExecutorImpl implements SnapshotExecutor {
         final SnapshotCopierOptions copierOpts = new SnapshotCopierOptions();
         copierOpts.setNodeOptions(this.node.getOptions());
         copierOpts.setRaftClientService(this.node.getRpcClientService());
-        copierOpts.setTimerManager(this.node.getTimerManager());
+        copierOpts.setTimerManager(this.node.getOptions().getScheduler());
         copierOpts.setRaftOptions(this.node.getRaftOptions());
         return copierOpts;
     }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/timer/DefaultTimer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/timer/DefaultTimer.java
index 07637fe..7b20039 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/timer/DefaultTimer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/timer/DefaultTimer.java
@@ -91,7 +91,7 @@ public class DefaultTimer implements Timer {
                 @Override
                 public boolean cancel() {
                     final ScheduledFuture<?> f = future;
-                    return f != null && f.cancel(true);
+                    return f != null && f.cancel(false); // Avoid interrupting a thread in the pool.
                 }
             };
         }
diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
index 9672e1b..f06911a 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
@@ -193,13 +193,13 @@ public class RaftServerImpl implements RaftServer {
     public @Nullable Peer localPeer(String groupId) {
         return new Peer(service.topologyService().localMember().address());
     }
-    
+
     /** {@inheritDoc} */
     @Override
     public Set<String> startedGroups() {
         return listeners.keySet();
     }
-    
+
     /**
      * Handle action request.
      *
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
index 056e686..8033ffc 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
@@ -45,7 +45,6 @@ public class MockStateMachine extends StateMachineAdapter {
     private final Lock lock = new ReentrantLock();
     private volatile int onStartFollowingTimes = 0;
     private volatile int onStopFollowingTimes = 0;
-    private volatile long leaderTerm = -1;
     private volatile long appliedIndex = -1;
     private volatile long snapshotIndex = -1L;
     private final List<ByteBuffer> logs = new ArrayList<>();
@@ -78,10 +77,6 @@ public class MockStateMachine extends StateMachineAdapter {
         return this.onStopFollowingTimes;
     }
 
-    public long getLeaderTerm() {
-        return this.leaderTerm;
-    }
-
     public long getAppliedIndex() {
         return this.appliedIndex;
     }
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
index 05af451..f098a6c 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
@@ -104,7 +104,6 @@ public class ReadOnlyServiceTest {
         nodeOptions.setScheduler(scheduler);
         Mockito.when(this.node.getNodeMetrics()).thenReturn(new NodeMetrics(false));
         Mockito.when(this.node.getGroupId()).thenReturn("test");
-        Mockito.when(this.node.getTimerManager()).thenReturn(nodeOptions.getScheduler());
         Mockito.when(this.node.getOptions()).thenReturn(nodeOptions);
         Mockito.when(this.node.getNodeId()).thenReturn(new NodeId("test", new PeerId("localhost:8081", 0)));
         Mockito.when(this.node.getServerId()).thenReturn(new PeerId("localhost:8081", 0));
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index b24c08d..87b0425 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -22,13 +22,11 @@ import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -48,18 +46,16 @@ import org.apache.ignite.raft.jraft.Node;
 import org.apache.ignite.raft.jraft.NodeManager;
 import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.apache.ignite.raft.jraft.conf.Configuration;
-import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.rpc.TestIgniteRpcServer;
 import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
 import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
-import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
 import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.raft.jraft.util.Endpoint;
 import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
-import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
+import org.apache.ignite.raft.jraft.util.Utils;
 import org.apache.ignite.utils.ClusterServiceTestUtils;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.TestInfo;
@@ -76,9 +72,9 @@ public class TestCluster {
     /**
      * Default election timeout.
      * Important: due to sync disk ops (writing raft meta) during probe request processing this timeout should be high
-     * enough to avoid test flakiness.
+     * enough to avoid test flakiness. Test environment might give another instability.
      */
-    private static final int ELECTION_TIMEOUT_MILLIS = 600;
+    private static final int ELECTION_TIMEOUT_MILLIS = 1000;
 
     private static final IgniteLogger LOG = IgniteLogger.forClass(TestCluster.class);
 
@@ -95,32 +91,6 @@ public class TestCluster {
     /** Test info. */
     private final TestInfo testInfo;
 
-    /**
-     * These disruptors will be used for all RAFT servers in the cluster.
-     */
-    private final HashMap<Endpoint, StripedDisruptor<FSMCallerImpl.ApplyTask>> fsmCallerDusruptors = new HashMap<>();
-
-    /**
-     * These disruptors will be used for all RAFT servers in the cluster.
-     */
-    private final HashMap<Endpoint, StripedDisruptor<NodeImpl.LogEntryAndClosure>> nodeDisruptors = new HashMap<>();
-
-    /**
-     * These disruptors will be used for all RAFT servers in the cluster.
-     */
-    private final HashMap<Endpoint, StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent>> readOnlyServiceDisruptors = new HashMap<>();
-
-    /**
-     * These disruptors will be used for all RAFT servers in the cluster.
-     */
-    private final HashMap<Endpoint, StripedDisruptor<LogManagerImpl.StableClosureEvent>> logManagerDisruptors = new HashMap<>();
-
-    private List<ExecutorService> executors = new CopyOnWriteArrayList<>();
-
-    private List<FixedThreadsExecutorGroup> fixedThreadsExecutorGroups = new CopyOnWriteArrayList<>();
-
-    private List<Scheduler> schedulers = new CopyOnWriteArrayList<>();
-
     private JRaftServiceFactory raftServiceFactory = new TestJRaftServiceFactory();
 
     private LinkedHashSet<PeerId> learners;
@@ -236,23 +206,11 @@ public class TestCluster {
                 return true;
             }
 
+            // Start node in non shared pools mode. Pools will be managed by node itself.
             NodeOptions nodeOptions = new NodeOptions();
 
             nodeOptions.setServerName(listenAddr.toString());
 
-            ExecutorService executor = JRaftUtils.createCommonExecutor(nodeOptions);
-            executors.add(executor);
-            nodeOptions.setCommonExecutor(executor);
-            FixedThreadsExecutorGroup threadsExecutorGroup = JRaftUtils.createAppendEntriesExecutor(nodeOptions);
-            fixedThreadsExecutorGroups.add(threadsExecutorGroup);
-            nodeOptions.setStripedExecutor(threadsExecutorGroup);
-            ExecutorService clientExecutor = JRaftUtils.createClientExecutor(nodeOptions, nodeOptions.getServerName());
-            executors.add(clientExecutor);
-            nodeOptions.setClientExecutor(clientExecutor);
-            Scheduler scheduler = JRaftUtils.createScheduler(nodeOptions);
-            schedulers.add(scheduler);
-            nodeOptions.setScheduler(scheduler);
-
             nodeOptions.setElectionTimeoutMs(this.electionTimeoutMs);
             nodeOptions.setEnableMetrics(enableMetrics);
             nodeOptions.setSnapshotThrottle(snapshotThrottle);
@@ -267,31 +225,16 @@ public class TestCluster {
             nodeOptions.setRaftMetaUri(serverDataPath + File.separator + "meta");
             nodeOptions.setSnapshotUri(serverDataPath + File.separator + "snapshot");
             nodeOptions.setElectionPriority(priority);
-
-            nodeOptions.setfSMCallerExecutorDisruptor(fsmCallerDusruptors.computeIfAbsent(listenAddr, endpoint -> new StripedDisruptor<>(
-                "JRaft-FSMCaller-Disruptor_TestCluster",
-                nodeOptions.getRaftOptions().getDisruptorBufferSize(),
-                () -> new FSMCallerImpl.ApplyTask(),
-                nodeOptions.getStripes())));
-
-            nodeOptions.setNodeApplyDisruptor(nodeDisruptors.computeIfAbsent(listenAddr, endpoint -> new StripedDisruptor<>(
-                "JRaft-NodeImpl-Disruptor_TestCluster",
-                nodeOptions.getRaftOptions().getDisruptorBufferSize(),
-                () -> new NodeImpl.LogEntryAndClosure(),
-                nodeOptions.getStripes())));
-
-            nodeOptions.setReadOnlyServiceDisruptor(readOnlyServiceDisruptors.computeIfAbsent(listenAddr, endpoint -> new StripedDisruptor<>(
-                "JRaft-ReadOnlyService-Disruptor_TestCluster",
-                nodeOptions.getRaftOptions().getDisruptorBufferSize(),
-                () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
-                nodeOptions.getStripes())));
-
-            nodeOptions.setLogManagerDisruptor(logManagerDisruptors.computeIfAbsent(listenAddr, endpoint -> new StripedDisruptor<>(
-                "JRaft-LogManager-Disruptor_TestCluster",
-                nodeOptions.getRaftOptions().getDisruptorBufferSize(),
-                () -> new LogManagerImpl.StableClosureEvent(),
-                nodeOptions.getStripes())));
-
+    
+            // Align rpc options with election timeout.
+            nodeOptions.setRpcConnectTimeoutMs(this.electionTimeoutMs / 3);
+            nodeOptions.setRpcDefaultTimeout(this.electionTimeoutMs / 2);
+            
+            // Reduce default threads count per test node.
+            nodeOptions.setRaftRpcThreadPoolSize(Utils.cpus());
+            nodeOptions.setTimerPoolSize(Utils.cpus() * 2);
+            nodeOptions.setRpcProcessorThreadPoolSize(Utils.cpus() * 3);
+    
             MockStateMachine fsm = new MockStateMachine(listenAddr);
             nodeOptions.setFsm(fsm);
 
@@ -319,8 +262,6 @@ public class TestCluster {
 
             ExecutorService requestExecutor = JRaftUtils.createRequestExecutor(nodeOptions);
 
-            executors.add(requestExecutor);
-
             var rpcServer = new TestIgniteRpcServer(clusterService, nodeManager, nodeOptions, requestExecutor);
 
             clusterService.start();
@@ -331,10 +272,15 @@ public class TestCluster {
             RaftGroupService server = new RaftGroupService(this.name, new PeerId(listenAddr, 0, priority),
                 nodeOptions, rpcServer, nodeManager) {
                 @Override public synchronized void shutdown() {
-                    super.shutdown();
-
+                    // This stop order is consistent with JRaftServerImpl
                     rpcServer.shutdown();
 
+                    ExecutorServiceHelper.shutdownAndAwaitTermination(requestExecutor);
+
+                    super.shutdown();
+    
+                    // Network service must be stopped after a node because raft initiates timeoutnowrequest on stop for faster
+                    // leader election.
                     clusterService.stop();
                 }
             };
@@ -402,14 +348,6 @@ public class TestCluster {
         List<Endpoint> addrs = getAllNodes();
         for (Endpoint addr : addrs)
             stop(addr);
-
-        fsmCallerDusruptors.values().forEach(StripedDisruptor::shutdown);
-        nodeDisruptors.values().forEach(StripedDisruptor::shutdown);
-        readOnlyServiceDisruptors.values().forEach(StripedDisruptor::shutdown);
-        logManagerDisruptors.values().forEach(StripedDisruptor::shutdown);
-        executors.forEach(ExecutorServiceHelper::shutdownAndAwaitTermination);
-        fixedThreadsExecutorGroups.forEach(FixedThreadsExecutorGroup::shutdownGracefully);
-        schedulers.forEach(Scheduler::shutdown);
     }
 
     public void clean(Endpoint listenAddr) {
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
index e650e03..60fbd2e 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
@@ -121,9 +121,9 @@ public class SnapshotExecutorTest extends BaseStorageTest {
         Mockito.when(node.getRaftOptions()).thenReturn(new RaftOptions());
         options = new NodeOptions();
         options.setCommonExecutor(JRaftUtils.createExecutor("test-executor", Utils.cpus()));
+        options.setScheduler(timerManager);
         Mockito.when(node.getOptions()).thenReturn(options);
         Mockito.when(node.getRpcClientService()).thenReturn(raftClientService);
-        Mockito.when(node.getTimerManager()).thenReturn(timerManager);
         Mockito.when(node.getServiceFactory()).thenReturn(new DefaultJRaftServiceFactory());
         executor = new SnapshotExecutorImpl();
         final SnapshotExecutorOptions opts = new SnapshotExecutorOptions();
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
index 0cc551e..c4dbcf2 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
@@ -40,6 +40,9 @@ import org.apache.ignite.raft.jraft.util.Endpoint;
 import org.mockito.ArgumentCaptor;
 
 import static java.lang.Thread.sleep;
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test helper
@@ -183,4 +186,14 @@ public class TestUtils {
             }
         }, timeout);
     }
+
+    /**
+     * Waits until all RAFT threads are stopped, throwing an AssertionException otherwise.
+     */
+    public static void assertAllJraftThreadsStopped() {
+        assertTrue(waitForCondition(() -> Thread.getAllStackTraces().keySet().stream().
+                noneMatch(t -> t.getName().contains("JRaft")), 5_000),
+            Thread.getAllStackTraces().keySet().stream().filter(t -> t.getName().contains("JRaft")).
+                sorted(comparing(Thread::getName)).collect(toList()).toString());
+    }
 }