You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/06/21 13:04:39 UTC

[ignite-3] branch main updated: IGNITE-14853 Stabilize flaky tests in jraft module - Fixes #176.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d42f95  IGNITE-14853 Stabilize flaky tests in jraft module - Fixes #176.
7d42f95 is described below

commit 7d42f95c19c5642d8e00a664aec154a2faa5a357
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Mon Jun 21 16:01:24 2021 +0300

    IGNITE-14853 Stabilize flaky tests in jraft module - Fixes #176.
    
    Signed-off-by: Alexey Scherbakov <al...@gmail.com>
---
 .../internal/metastorage/MetaStorageManager.java   |   2 +-
 .../apache/ignite/network/MessagingService.java    |  10 +
 .../ignite/network/NetworkMessageHandler.java      |   5 +-
 .../scalecube/ITScaleCubeNetworkMessagingTest.java |  13 +-
 .../scalecube/ScaleCubeClusterServiceFactory.java  |   2 +-
 .../scalecube/ScaleCubeMessagingService.java       |  28 +-
 .../client/service/impl/RaftGroupServiceImpl.java  |   1 +
 .../ignite/raft/jraft/core/ITCliServiceTest.java   |  53 ++--
 .../apache/ignite/raft/jraft/core/ITNodeTest.java  | 297 ++++++++++++---------
 .../raft/server/ITJRaftCounterServerTest.java      |  21 +-
 .../internal/raft/server/impl/JRaftServerImpl.java |   2 +-
 .../internal/raft/server/impl/RaftServerImpl.java  |  15 +-
 .../apache/ignite/raft/jraft/RaftGroupService.java |  37 +--
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |  11 +-
 .../ignite/raft/jraft/error/RemotingException.java |   1 -
 .../apache/ignite/raft/jraft/rpc/RpcClient.java    |  52 +---
 .../ignite/raft/jraft/rpc/RpcRequestClosure.java   |   1 +
 .../ignite/raft/jraft/rpc/RpcRequestProcessor.java |   1 +
 .../raft/jraft/rpc/RpcResponseClosureAdapter.java  |   4 -
 .../raft/jraft/rpc/impl/AbstractClientService.java |  47 ++--
 .../raft/jraft/rpc/impl/IgniteRpcClient.java       |  63 +----
 .../raft/jraft/rpc/impl/IgniteRpcServer.java       |   8 +-
 .../impl/core/AppendEntriesRequestProcessor.java   |   6 +-
 .../ignite/raft/jraft/util/internal/ThrowUtil.java |  60 +++--
 .../apache/ignite/raft/jraft/core/TestCluster.java |  94 +++++--
 .../raft/jraft/rpc/AbstractClientServiceTest.java  |  66 +----
 .../ignite/raft/jraft/rpc/AbstractRpcTest.java     |  91 +------
 .../ignite/raft/jraft/rpc/IgniteRpcTest.java       |   4 +-
 .../ignite/raft/jraft/rpc/TestIgniteRpcServer.java |  11 +-
 .../snapshot/local/LocalSnapshotReaderTest.java    |   9 +-
 30 files changed, 465 insertions(+), 550 deletions(-)

diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 4ded54b..7e1316f 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -167,7 +167,7 @@ public class MetaStorageManager {
 //        );
 
         // TODO: IGNITE-14414 Cluster initialization flow. Here we should complete metaStorageServiceFuture.
-        clusterNetSvc.messagingService().addMessageHandler((message, sender, correlationId) -> {});
+        clusterNetSvc.messagingService().addMessageHandler((message, senderAddr, correlationId) -> {});
     }
 
     /**
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
index a60222c..4312ab9 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
@@ -58,6 +58,16 @@ public interface MessagingService {
     CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg, String correlationId);
 
     /**
+     * Same as {@link #send(ClusterNode, NetworkMessage)} but attaches the given correlation ID to the given message.
+     *
+     * @param addr Recipient network address in host:port format.
+     * @param msg Message which should be delivered.
+     * @param correlationId Correlation id when replying to the request.
+     * @return Future of the send operation.
+     */
+    CompletableFuture<Void> send(String addr, NetworkMessage msg, String correlationId);
+
+    /**
      * Sends a message asynchronously with same guarantees as {@link #send(ClusterNode, NetworkMessage)} and
      * returns a future that will be completed successfully upon receiving a response.
      *
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
index 47f4737..94e2e0b 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
@@ -22,8 +22,9 @@ package org.apache.ignite.network;
 public interface NetworkMessageHandler {
     /**
      * @param message Message which was received from the cluster.
-     * @param sender Sender.
+     * @param senderAddr Sender address. Use
+     * {@link TopologyService#getByAddress(String)} to resolve a cluster node.
      * @param correlationId Correlation id.
      */
-    void onReceived(NetworkMessage message, ClusterNode sender, String correlationId);
+    void onReceived(NetworkMessage message, String senderAddr, String correlationId);
 }
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index 074cd4e..28c1956 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -78,7 +78,7 @@ class ITScaleCubeNetworkMessagingTest {
 
         for (ClusterService member : testCluster.members) {
             member.messagingService().addMessageHandler(
-                (message, sender, correlationId) -> {
+                (message, senderAddr, correlationId) -> {
                     messageStorage.put(member.localConfiguration().getName(), (TestMessage)message);
                     messageReceivedLatch.countDown();
                 }
@@ -137,11 +137,11 @@ class ITScaleCubeNetworkMessagingTest {
         class Data {
             private final TestMessage message;
 
-            private final ClusterNode sender;
+            private final String sender;
 
             private final String correlationId;
 
-            private Data(TestMessage message, ClusterNode sender, String correlationId) {
+            private Data(TestMessage message, String sender, String correlationId) {
                 this.message = message;
                 this.sender = sender;
                 this.correlationId = correlationId;
@@ -151,7 +151,8 @@ class ITScaleCubeNetworkMessagingTest {
         var dataFuture = new CompletableFuture<Data>();
 
         member.messagingService().addMessageHandler(
-            (message, sender, correlationId) -> dataFuture.complete(new Data((TestMessage)message, sender, correlationId))
+            (message, senderAddr, correlationId) ->
+                dataFuture.complete(new Data((TestMessage)message, senderAddr, correlationId))
         );
 
         var requestMessage = messageFactory.testMessage().msg("request").build();
@@ -162,7 +163,7 @@ class ITScaleCubeNetworkMessagingTest {
         Data actualData = dataFuture.get(3, TimeUnit.SECONDS);
 
         assertThat(actualData.message.msg(), is(requestMessage.msg()));
-        assertThat(actualData.sender, is(self));
+        assertThat(actualData.sender, is(self.address()));
         assertThat(actualData.correlationId, is(correlationId));
     }
 
@@ -181,7 +182,7 @@ class ITScaleCubeNetworkMessagingTest {
         var requestMessage = messageFactory.testMessage().msg("request").build();
         var responseMessage = messageFactory.testMessage().msg("response").build();
 
-        member.messagingService().addMessageHandler((message, sender, correlationId) -> {
+        member.messagingService().addMessageHandler((message, senderAddr, correlationId) -> {
             if (message.equals(requestMessage))
                 member.messagingService().send(self, responseMessage, correlationId);
         });
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index d7d2685..cc4c5d1 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -54,7 +54,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
 
         var topologyService = new ScaleCubeTopologyService();
 
-        var messagingService = new ScaleCubeMessagingService(topologyService);
+        var messagingService = new ScaleCubeMessagingService();
 
         var messageFactory = new NetworkMessagesFactory();
 
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index 6bf365b..a0c75e7 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -39,20 +39,6 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
     private Cluster cluster;
 
     /**
-     * Topology service.
-     */
-    private ScaleCubeTopologyService topologyService;
-
-    /**
-     * Constructor.
-     *
-     * @param topologyService Topology service.
-     */
-    ScaleCubeMessagingService(ScaleCubeTopologyService topologyService) {
-        this.topologyService = topologyService;
-    }
-
-    /**
      * Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency injection.
      *
      * @param cluster Cluster.
@@ -68,15 +54,11 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
      */
     void fireEvent(Message message) {
         NetworkMessage msg = message.data();
-        ClusterNode sender = topologyService.getByAddress(message.header(Message.HEADER_SENDER));
-
-        if (sender == null) // Ignore the message from the unknown node.
-            return;
 
         String correlationId = message.correlationId();
 
         for (NetworkMessageHandler handler : getMessageHandlers())
-            handler.onReceived(msg, sender, correlationId);
+            handler.onReceived(msg, message.header(Message.HEADER_SENDER), correlationId);
     }
 
     /** {@inheritDoc} */
@@ -95,13 +77,19 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
 
     /** {@inheritDoc} */
     @Override public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg, String correlationId) {
+        return send(recipient.address(), msg, correlationId);
+    }
+
+    @Override public CompletableFuture<Void> send(String addr, NetworkMessage msg, String correlationId) {
         var message = Message
             .withData(msg)
             .correlationId(correlationId)
             .build();
 
+        Address address = Address.from(addr);
+
         return cluster
-            .send(clusterNodeAddress(recipient), message)
+            .send(address, message)
             .toFuture();
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
index 0d04eec..bfccb55 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
@@ -333,6 +333,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
 
     /** {@inheritDoc} */
     @Override public void shutdown() {
+        // No-op.
     }
 
     /**
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
index 05b8a56..a7730be 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
@@ -104,15 +104,7 @@ public class ITCliServiceTest {
             cluster.startLearner(peer);
 
         cluster.waitLeader();
-
-        for (Node follower : cluster.getFollowers())
-            assertTrue(waitForCondition(() -> follower.getLeaderId() != null, 3_000));
-
-        for (PeerId learner : cluster.getLearners()) {
-            Node node = cluster.getNode(learner.getEndpoint());
-
-            assertTrue(waitForCondition(() -> node.getLeaderId() != null, 3_000));
-        }
+        cluster.ensureLeader(cluster.getLeader());
 
         cliService = new CliServiceImpl();
         conf = new Configuration(peers, learners);
@@ -187,11 +179,13 @@ public class ITCliServiceTest {
         PeerId learner3 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + LEARNER_PORT_STEP + 3);
         assertTrue(cluster.startLearner(learner3));
         sendTestTaskAndWait(cluster.getLeader(), 0);
-        sleep(500);
+        cluster.ensureSame(addr -> addr.equals(learner3.getEndpoint()));
+
         for (MockStateMachine fsm : cluster.getFsms()) {
             if (!fsm.getAddress().equals(learner3.getEndpoint()))
                 assertEquals(10, fsm.getLogs().size());
         }
+
         assertEquals(0, cluster.getFsmByPeer(learner3).getLogs().size());
         List<PeerId> oldLearners = new ArrayList<PeerId>(conf.getLearners());
         assertEquals(oldLearners, cliService.getLearners(groupId, conf));
@@ -199,13 +193,16 @@ public class ITCliServiceTest {
 
         // Add learner3
         cliService.addLearners(groupId, conf, Collections.singletonList(learner3));
-        sleep(1000);
-        assertEquals(10, cluster.getFsmByPeer(learner3).getLogs().size());
+
+        assertTrue(waitForCondition(() -> cluster.getFsmByPeer(learner3).getLogs().size() == 10, 5_000));
 
         sendTestTaskAndWait(cluster.getLeader(), 0);
-        sleep(1000);
+
+        cluster.ensureSame();
+
         for (MockStateMachine fsm : cluster.getFsms())
             assertEquals(20, fsm.getLogs().size());
+
         List<PeerId> newLearners = new ArrayList<>(oldLearners);
         newLearners.add(learner3);
         assertEquals(newLearners, cliService.getLearners(groupId, conf));
@@ -214,11 +211,14 @@ public class ITCliServiceTest {
         // Remove  3
         cliService.removeLearners(groupId, conf, Collections.singletonList(learner3));
         sendTestTaskAndWait(cluster.getLeader(), 0);
-        sleep(1000);
+
+        cluster.ensureSame(addr -> addr.equals(learner3.getEndpoint()));
+
         for (MockStateMachine fsm : cluster.getFsms()) {
             if (!fsm.getAddress().equals(learner3.getEndpoint()))
                 assertEquals(30, fsm.getLogs().size());
         }
+
         // Latest 10 logs are not replicated to learner3, because it's removed.
         assertEquals(20, cluster.getFsmByPeer(learner3).getLogs().size());
         assertEquals(oldLearners, cliService.getLearners(groupId, conf));
@@ -226,11 +226,13 @@ public class ITCliServiceTest {
 
         // Set learners into [learner3]
         cliService.resetLearners(groupId, conf, Collections.singletonList(learner3));
-        sleep(100);
-        assertEquals(30, cluster.getFsmByPeer(learner3).getLogs().size());
+
+        assertTrue(waitForCondition(() -> cluster.getFsmByPeer(learner3).getLogs().size() == 30, 5_000));
 
         sendTestTaskAndWait(cluster.getLeader(), 0);
-        sleep(1000);
+
+        cluster.ensureSame(addr -> oldLearners.contains(new PeerId(addr, 0)));
+
         // Latest 10 logs are not replicated to learner1 and learner2, because they were removed by resetting learners set.
         for (MockStateMachine fsm : cluster.getFsms()) {
             if (!oldLearners.contains(new PeerId(fsm.getAddress(), 0)))
@@ -238,6 +240,7 @@ public class ITCliServiceTest {
             else
                 assertEquals(30, fsm.getLogs().size());
         }
+
         assertEquals(Collections.singletonList(learner3), cliService.getLearners(groupId, conf));
         assertEquals(Collections.singletonList(learner3), cliService.getAliveLearners(groupId, conf));
 
@@ -253,15 +256,18 @@ public class ITCliServiceTest {
         PeerId peer3 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 3);
         assertTrue(cluster.start(peer3.getEndpoint()));
         sendTestTaskAndWait(cluster.getLeader(), 0);
-        sleep(100);
+        cluster.ensureSame(addr -> addr.equals(peer3.getEndpoint()));
         assertEquals(0, cluster.getFsmByPeer(peer3).getLogs().size());
 
         assertTrue(cliService.addPeer(groupId, conf, peer3).isOk());
-        sleep(100);
-        assertEquals(10, cluster.getFsmByPeer(peer3).getLogs().size());
+
+        assertTrue(waitForCondition(() -> cluster.getFsmByPeer(peer3).getLogs().size() == 10, 5_000));
         sendTestTaskAndWait(cluster.getLeader(), 0);
-        sleep(100);
+
         assertEquals(6, cluster.getFsms().size());
+
+        cluster.ensureSame();
+
         for (MockStateMachine fsm : cluster.getFsms())
             assertEquals(20, fsm.getLogs().size());
 
@@ -269,8 +275,11 @@ public class ITCliServiceTest {
         assertTrue(cliService.removePeer(groupId, conf, peer3).isOk());
         sleep(200);
         sendTestTaskAndWait(cluster.getLeader(), 0);
-        sleep(1000);
+
         assertEquals(6, cluster.getFsms().size());
+
+        cluster.ensureSame(addr -> addr.equals(peer3.getEndpoint()));
+
         for (MockStateMachine fsm : cluster.getFsms()) {
             if (fsm.getAddress().equals(peer3.getEndpoint()))
                 assertEquals(20, fsm.getLogs().size());
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
index 159bba9..5ac11c7 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
@@ -102,7 +102,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 /**
- * Integration tests for raft cluster.
+ * Integration tests for raft cluster. TODO asch get rid of sleeps wherether possible IGNITE-14832
  */
 public class ITNodeTest {
     private static final Logger LOG = LoggerFactory.getLogger(ITNodeTest.class);
@@ -177,7 +177,8 @@ public class ITNodeTest {
         services.forEach(service -> {
             try {
                 service.shutdown();
-            } catch (Exception e) {
+            }
+            catch (Exception e) {
                 LOG.error("Error while closing a service", e);
             }
         });
@@ -204,7 +205,7 @@ public class ITNodeTest {
 
         RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions);
 
-        service.start(true);
+        service.start();
     }
 
     @Test
@@ -225,7 +226,7 @@ public class ITNodeTest {
 
         RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions);
 
-        Node node = service.start(true);
+        final Node node = service.start();
 
         assertEquals(1, node.listPeers().size());
         assertTrue(node.listPeers().contains(peer));
@@ -303,7 +304,7 @@ public class ITNodeTest {
 
         RaftGroupService service = createService("unittest", peer, nodeOptions);
 
-        Node node = service.start(true);
+        final Node node = service.start();
 
         assertEquals(1, node.listPeers().size());
         assertTrue(node.listPeers().contains(peer));
@@ -350,8 +351,8 @@ public class ITNodeTest {
             applyLatch.countDown();
 
             // The state machine is in error state, the node should step down.
-            while (node.isLeader())
-                Thread.sleep(10);
+            waitForCondition(() -> !node.isLeader(), 5_000);
+
             latch.await();
             applyCompleteLatch.await();
         }
@@ -416,51 +417,6 @@ public class ITNodeTest {
         waitLatch(latch);
     }
 
-    private void sendTestTaskAndWait(Node node) throws InterruptedException {
-        sendTestTaskAndWait(node, 0, 10, RaftError.SUCCESS);
-    }
-
-    private void sendTestTaskAndWait(Node node, int amount) throws InterruptedException {
-        sendTestTaskAndWait(node, 0, amount, RaftError.SUCCESS);
-    }
-
-    private void sendTestTaskAndWait(Node node, RaftError err) throws InterruptedException {
-        sendTestTaskAndWait(node, 0, 10, err);
-    }
-
-    private void sendTestTaskAndWait(Node node, int start, int amount,
-        RaftError err) throws InterruptedException {
-        CountDownLatch latch = new CountDownLatch(amount);
-        for (int i = start; i < start + amount; i++) {
-            ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
-            Task task = new Task(data, new ExpectClosure(err, latch));
-            node.apply(task);
-        }
-        waitLatch(latch);
-    }
-
-    private void sendTestTaskAndWait(Node node, int start,
-        RaftError err) throws InterruptedException {
-        sendTestTaskAndWait(node, start, 10, err);
-    }
-
-    @SuppressWarnings("SameParameterValue")
-    private void sendTestTaskAndWait(String prefix, Node node, int code) throws InterruptedException {
-        sendTestTaskAndWait(prefix, node, 10, code);
-    }
-
-    @SuppressWarnings("SameParameterValue")
-    private void sendTestTaskAndWait(String prefix, Node node, int amount,
-        int code) throws InterruptedException {
-        CountDownLatch latch = new CountDownLatch(10);
-        for (int i = 0; i < amount; i++) {
-            ByteBuffer data = ByteBuffer.wrap((prefix + i).getBytes());
-            Task task = new Task(data, new ExpectClosure(code, null, latch));
-            node.apply(task);
-        }
-        waitLatch(latch);
-    }
-
     @Test
     public void testTripleNodesWithReplicatorStateListener() throws Exception {
         List<PeerId> peers = TestUtils.generatePeers(3);
@@ -477,6 +433,7 @@ public class ITNodeTest {
 
         // elect leader
         cluster.waitLeader();
+        cluster.ensureLeader(cluster.getLeader());
 
         for (Node follower : cluster.getFollowers())
             waitForCondition(() -> follower.getLeaderId() != null, 5_000);
@@ -509,6 +466,8 @@ public class ITNodeTest {
         // get leader
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
+
         assertEquals(3, leader.listPeers().size());
         // apply tasks to leader
         sendTestTaskAndWait(leader);
@@ -571,8 +530,9 @@ public class ITNodeTest {
             assertTrue(cluster.start(peer.getEndpoint()));
 
         cluster.waitLeader();
-
         Node leader = cluster.getLeader();
+        cluster.ensureLeader(leader);
+
         sendTestTaskAndWait(leader);
         Thread.sleep(100);
         List<Node> followers = cluster.getFollowers();
@@ -608,6 +568,8 @@ public class ITNodeTest {
         // get leader
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
+
         assertEquals(3, leader.listPeers().size());
         // apply tasks to leader
         sendTestTaskAndWait(leader);
@@ -671,7 +633,7 @@ public class ITNodeTest {
                 .singletonList(learnerPeer)));
 
             learnerServer = createService("unittest", new PeerId(learnerAddr, 0), nodeOptions);
-            learnerServer.start(true);
+            learnerServer.start();
         }
 
         {
@@ -686,7 +648,7 @@ public class ITNodeTest {
                 .singletonList(learnerPeer)));
 
             RaftGroupService server = createService("unittest", new PeerId(addr, 0), nodeOptions);
-            Node node = server.start(true);
+            Node node = server.start();
 
             assertEquals(1, node.listPeers().size());
             assertTrue(node.listPeers().contains(peer));
@@ -730,12 +692,12 @@ public class ITNodeTest {
         cluster.waitLeader();
 
         Node leader = cluster.getLeader();
+        cluster.ensureLeader(leader);
 
         waitForCondition(() -> leader.listAlivePeers().size() == 3, 5_000);
         waitForCondition(() -> leader.listAliveLearners().size() == 3, 5_000);
 
         sendTestTaskAndWait(leader);
-        Thread.sleep(500);
         List<MockStateMachine> fsms = cluster.getFsms();
         assertEquals(6, fsms.size());
         cluster.ensureSame();
@@ -799,6 +761,7 @@ public class ITNodeTest {
         // elect leader
         cluster.waitLeader();
         Node leader = cluster.getLeader();
+        cluster.ensureLeader(leader);
 
         assertEquals(3, leader.listPeers().size());
         assertEquals(1, leader.listLearners().size());
@@ -835,6 +798,8 @@ public class ITNodeTest {
         // get leader
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
+
         assertEquals(3, leader.listPeers().size());
         assertTrue(leader.listLearners().isEmpty());
         assertTrue(leader.listAliveLearners().isEmpty());
@@ -901,6 +866,7 @@ public class ITNodeTest {
             assertTrue(done.await().isOk());
             assertEquals(2, leader.listAliveLearners().size());
             assertEquals(2, leader.listLearners().size());
+            cluster.ensureSame();
         }
         {
             // stop two followers
@@ -915,13 +881,11 @@ public class ITNodeTest {
             assertEquals(RaftError.EPERM, done.getStatus().getRaftError());
             // One peer with two learners.
             assertEquals(3, cluster.getFsms().size());
-            cluster.ensureSame();
         }
     }
 
     @Test
     public void testNodesWithPriorityElection() throws Exception {
-
         List<Integer> priorities = new ArrayList<>();
         priorities.add(100);
         priorities.add(40);
@@ -939,6 +903,8 @@ public class ITNodeTest {
         // get leader
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
+
         assertEquals(3, leader.listPeers().size());
         assertEquals(100, leader.getNodeTargetPriority());
         assertEquals(100, leader.getLeaderId().getPriority());
@@ -947,7 +913,6 @@ public class ITNodeTest {
 
     @Test
     public void testNodesWithPartPriorityElection() throws Exception {
-
         List<Integer> priorities = new ArrayList<>();
         priorities.add(100);
         priorities.add(40);
@@ -965,6 +930,8 @@ public class ITNodeTest {
         // get leader
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
+
         assertEquals(3, leader.listPeers().size());
         assertEquals(2, cluster.getFollowers().size());
     }
@@ -989,6 +956,8 @@ public class ITNodeTest {
         // get leader
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
+
         assertEquals(3, leader.listPeers().size());
         assertEquals(2, cluster.getFollowers().size());
     }
@@ -1013,6 +982,8 @@ public class ITNodeTest {
         // get leader
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
+
         assertEquals(3, leader.listPeers().size());
         assertEquals(2, cluster.getFollowers().size());
         assertEquals(50, leader.getNodeTargetPriority());
@@ -1056,6 +1027,7 @@ public class ITNodeTest {
 
         cluster.waitLeader();
         Node leader = cluster.getLeader();
+        cluster.ensureLeader(leader);
 
         assertNotNull(leader);
         assertEquals(100, leader.getNodeId().getPeerId().getPriority());
@@ -1110,6 +1082,8 @@ public class ITNodeTest {
         // get leader
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
+
         assertEquals(100, leader.getNodeTargetPriority());
         assertEquals(100, leader.getNodeId().getPeerId().getPriority());
 
@@ -1249,6 +1223,7 @@ public class ITNodeTest {
         // get leader
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+
         assertEquals(3, leader.listPeers().size());
         // apply tasks to leader
         sendTestTaskAndWait(leader);
@@ -1294,6 +1269,7 @@ public class ITNodeTest {
         // get leader
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+
         assertEquals(3, leader.listPeers().size());
         // apply tasks to leader
         sendTestTaskAndWait(leader);
@@ -1440,31 +1416,6 @@ public class ITNodeTest {
             assertEquals(10000, fsm.getLogs().size());
     }
 
-    @SuppressWarnings({"unused", "SameParameterValue"})
-    private boolean assertReadIndex(Node node, int index) throws InterruptedException {
-        CountDownLatch latch = new CountDownLatch(1);
-        byte[] requestContext = TestUtils.getRandomBytes();
-        AtomicBoolean success = new AtomicBoolean(false);
-        node.readIndex(requestContext, new ReadIndexClosure() {
-
-            @Override
-            public void run(Status status, long theIndex, byte[] reqCtx) {
-                if (status.isOk()) {
-                    assertEquals(index, theIndex);
-                    assertArrayEquals(requestContext, reqCtx);
-                    success.set(true);
-                }
-                else {
-                    assertTrue(status.getErrorMsg().contains("RPC exception:Check connection["), status.getErrorMsg());
-                    assertTrue(status.getErrorMsg().contains("] fail and try to create new one"), status.getErrorMsg());
-                }
-                latch.countDown();
-            }
-        });
-        latch.await();
-        return success.get();
-    }
-
     @Test
     public void testNodeMetrics() throws Exception {
         List<PeerId> peers = TestUtils.generatePeers(3);
@@ -1661,10 +1612,6 @@ public class ITNodeTest {
             assertEquals(10, fsm.getLogs().size());
     }
 
-    private void waitLatch(CountDownLatch latch) throws InterruptedException {
-        assertTrue(latch.await(30, TimeUnit.SECONDS));
-    }
-
     @Test
     public void testRemoveFollower() throws Exception {
         List<PeerId> peers = TestUtils.generatePeers(3);
@@ -1679,6 +1626,8 @@ public class ITNodeTest {
         // get leader
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
+
         // apply tasks to leader
         sendTestTaskAndWait(leader);
 
@@ -1739,6 +1688,8 @@ public class ITNodeTest {
         // get leader
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
+
         // apply tasks to leader
         sendTestTaskAndWait(leader);
 
@@ -1755,7 +1706,6 @@ public class ITNodeTest {
         CountDownLatch latch = new CountDownLatch(1);
         leader.removePeer(oldLeader, new ExpectClosure(latch));
         waitLatch(latch);
-        Thread.sleep(100);
 
         // elect new leader
         cluster.waitLeader();
@@ -1916,9 +1866,6 @@ public class ITNodeTest {
         LOG.info("start follower {}", followerAddr2);
         assertTrue(cluster.start(followerAddr2, true, 300));
 
-        // Make sure the leader has discovered new nodes.
-        assertTrue(waitForTopology(cluster, leaderAddr, 3, 30_000)); // Discovery may take a while sometimes.
-
         CountDownLatch latch = new CountDownLatch(1);
         LOG.info("Add old follower {}", followerAddr1);
         leader.addPeer(followerPeer1, new ExpectClosure(latch));
@@ -1942,7 +1889,7 @@ public class ITNodeTest {
      * @throws Exception
      */
     @Test
-    public void testRestoreSnasphot() throws Exception {
+    public void testRestoreSnapshot() throws Exception {
         List<PeerId> peers = TestUtils.generatePeers(3);
 
         cluster = new TestCluster("unitest", dataPath, peers);
@@ -2021,22 +1968,6 @@ public class ITNodeTest {
         assertEquals(1, fsm.getLoadSnapshotTimes());
     }
 
-    private void triggerLeaderSnapshot(TestCluster cluster, Node leader) throws InterruptedException {
-        triggerLeaderSnapshot(cluster, leader, 1);
-    }
-
-    private void triggerLeaderSnapshot(TestCluster cluster, Node leader, int times)
-        throws InterruptedException {
-        // trigger leader snapshot
-        // first snapshot will be triggered randomly
-        int snapshotTimes = cluster.getLeaderFsm().getSaveSnapshotTimes();
-        assertTrue(snapshotTimes == times - 1 || snapshotTimes == times, "snapshotTimes=" + snapshotTimes + ", times=" + times);
-        CountDownLatch latch = new CountDownLatch(1);
-        leader.snapshot(new ExpectClosure(latch));
-        waitLatch(latch);
-        assertEquals(snapshotTimes + 1, cluster.getLeaderFsm().getSaveSnapshotTimes());
-    }
-
     @Test
     public void testInstallSnapshotWithThrottle() throws Exception {
         List<PeerId> peers = TestUtils.generatePeers(3);
@@ -2050,6 +1981,8 @@ public class ITNodeTest {
         // get leader
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
+
         // apply tasks to leader
         sendTestTaskAndWait(leader);
 
@@ -2092,6 +2025,7 @@ public class ITNodeTest {
     }
 
     @Test // TODO add test for timeout on snapshot install https://issues.apache.org/jira/browse/IGNITE-14832
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-14943")
     public void testInstallLargeSnapshotWithThrottle() throws Exception {
         List<PeerId> peers = TestUtils.generatePeers(4);
         cluster = new TestCluster("unitest", dataPath, peers.subList(0, 3));
@@ -2104,6 +2038,8 @@ public class ITNodeTest {
         // get leader
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
+
         // apply tasks to leader
         sendTestTaskAndWait(leader, 0, RaftError.SUCCESS);
 
@@ -2160,6 +2096,8 @@ public class ITNodeTest {
         // get leader
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
+
         // apply tasks to leader
         sendTestTaskAndWait(leader, 0, RaftError.SUCCESS);
 
@@ -2205,7 +2143,6 @@ public class ITNodeTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-14853")
     public void testInstallSnapshot() throws Exception {
         List<PeerId> peers = TestUtils.generatePeers(3);
 
@@ -2218,6 +2155,8 @@ public class ITNodeTest {
         // get leader
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
+
         // apply tasks to leader
         sendTestTaskAndWait(leader);
 
@@ -2342,13 +2281,11 @@ public class ITNodeTest {
 
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
 
         List<Node> followers = cluster.getFollowers();
         assertEquals(2, followers.size());
 
-        // Ensure the quorum before stopping a follower, otherwise leader can step down.
-        assertTrue(waitForCondition(() -> followers.get(1).getLeaderId() != null, 5_000));
-
         Endpoint followerAddr = followers.get(0).getNodeId().getPeerId().getEndpoint().copy();
         assertTrue(cluster.stop(followerAddr));
 
@@ -2392,7 +2329,6 @@ public class ITNodeTest {
         PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy();
         LOG.info("Transfer leadership from {} to {}", leader, targetPeer);
         assertTrue(leader.transferLeadershipTo(targetPeer).isOk());
-        Thread.sleep(1000);
         cluster.waitLeader();
         leader = cluster.getLeader();
         assertEquals(leader.getNodeId().getPeerId(), targetPeer);
@@ -2411,13 +2347,11 @@ public class ITNodeTest {
 
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
 
         List<Node> followers = cluster.getFollowers();
         assertEquals(2, followers.size());
 
-        // Ensure the quorum before stopping a follower, otherwise leader can step down.
-        assertTrue(waitForCondition(() -> followers.get(1).getLeaderId() != null, 5_000));
-
         PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy();
         assertTrue(cluster.stop(targetPeer.getEndpoint()));
 
@@ -2453,6 +2387,7 @@ public class ITNodeTest {
 
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
 
         List<Node> followers = cluster.getFollowers();
         assertEquals(2, followers.size());
@@ -2521,7 +2456,7 @@ public class ITNodeTest {
             nodeOptions.setInitialConf(new Configuration(Collections.singletonList(new PeerId(addr, 0))));
 
             RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions);
-            Node node = service.start(true);
+            Node node = service.start();
 
             Thread.sleep(1000);
             sendTestTaskAndWait(node);
@@ -2544,7 +2479,7 @@ public class ITNodeTest {
 
             RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions);
             try {
-                service.start(true);
+                service.start();
 
                 fail();
             }
@@ -2635,7 +2570,7 @@ public class ITNodeTest {
 
         PeerId follower = followers.get(0).getNodeId().getPeerId();
         assertTrue(leader.transferLeadershipTo(follower).isOk());
-        Thread.sleep(2000);
+        cluster.waitLeader();
         leader = cluster.getLeader();
         assertEquals(follower, leader.getNodeId().getPeerId());
 
@@ -2722,6 +2657,8 @@ public class ITNodeTest {
         cluster.waitLeader();
         Node firstLeader = cluster.getLeader();
         assertNotNull(firstLeader);
+        cluster.ensureLeader(firstLeader);
+
         // apply something
         sendTestTaskAndWait(firstLeader);
 
@@ -2788,6 +2725,8 @@ public class ITNodeTest {
 
         Node leader = cluster.getLeader();
         assertNotNull(leader);
+        cluster.ensureLeader(leader);
+
         sendTestTaskAndWait(leader);
 
         // index == 1 is a CONFIGURATION log, so real_index will be 2 when returned.
@@ -2903,7 +2842,7 @@ public class ITNodeTest {
 
         RaftGroupService service = createService("test", new PeerId(addr, 0), nodeOpts);
 
-        Node node = service.start(true);
+        Node node = service.start();
         assertEquals(26, fsm.getLogs().size());
 
         for (int i = 0; i < 26; i++)
@@ -2941,7 +2880,7 @@ public class ITNodeTest {
 
         RaftGroupService service = createService("test", new PeerId(addr, 0), nodeOpts);
 
-        Node node = service.start(true);
+        Node node = service.start();
         while (!node.isLeader())
             Thread.sleep(20);
         sendTestTaskAndWait(node);
@@ -2949,7 +2888,6 @@ public class ITNodeTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-14852")
     public void testChangePeers() throws Exception {
         PeerId peer0 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT);
         cluster = new TestCluster("testChangePeers", dataPath, Collections.singletonList(peer0));
@@ -2975,7 +2913,12 @@ public class ITNodeTest {
             Status status = done.await();
             assertTrue(status.isOk(), status.getRaftError().toString());
         }
-        cluster.ensureSame();
+
+        cluster.waitLeader();
+
+        for (final MockStateMachine fsm : cluster.getFsms()) {
+            assertEquals(10, fsm.getLogs().size());
+        }
     }
 
     @Test
@@ -3349,7 +3292,7 @@ public class ITNodeTest {
 
         assertNull(cluster.getLeader());
 
-        Thread.sleep(3000);
+        Thread.sleep(2000);
 
         assertNull(cluster.getLeader());
 
@@ -3376,7 +3319,13 @@ public class ITNodeTest {
     }
 
     /**
-     * {@inheritDoc}
+     * TODO asch get rid of waiting for topology IGNITE-14832
+     *
+     * @param cluster
+     * @param addr
+     * @param expected
+     * @param timeout
+     * @return
      */
     private boolean waitForTopology(TestCluster cluster, Endpoint addr, int expected, long timeout) {
         RaftGroupService grp = cluster.getServer(addr);
@@ -3412,7 +3361,9 @@ public class ITNodeTest {
     }
 
     /**
-     * {@inheritDoc}
+     * @param cond The condition.
+     * @param timeout The timeout.
+     * @return {@code True} if the condition is satisfied.
      */
     private boolean waitForCondition(BooleanSupplier cond, long timeout) {
         long stop = System.currentTimeMillis() + timeout;
@@ -3453,7 +3404,7 @@ public class ITNodeTest {
 
         ClusterService clusterService = createClusterService(peerId.getEndpoint(), servers);
 
-        IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, servers, nodeManager);
+        IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, servers, nodeManager, nodeOptions);
 
         nodeOptions.setRpcClient(new IgniteRpcClient(clusterService));
 
@@ -3484,4 +3435,94 @@ public class ITNodeTest {
 
         return clusterServiceFactory.createClusterService(clusterConfig);
     }
+
+    private void sendTestTaskAndWait(final Node node) throws InterruptedException {
+        this.sendTestTaskAndWait(node, 0, 10, RaftError.SUCCESS);
+    }
+
+    private void sendTestTaskAndWait(final Node node, int amount) throws InterruptedException {
+        this.sendTestTaskAndWait(node, 0, amount, RaftError.SUCCESS);
+    }
+
+    private void sendTestTaskAndWait(final Node node, final RaftError err) throws InterruptedException {
+        this.sendTestTaskAndWait(node, 0, 10, err);
+    }
+
+    private void sendTestTaskAndWait(final Node node, final int start, int amount,
+                                     final RaftError err) throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(amount);
+        for (int i = start; i < start + amount; i++) {
+            final ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
+            final Task task = new Task(data, new ExpectClosure(err, latch));
+            node.apply(task);
+        }
+        waitLatch(latch);
+    }
+
+    private void sendTestTaskAndWait(final Node node, final int start,
+                                     final RaftError err) throws InterruptedException {
+        sendTestTaskAndWait(node, start, 10, err);
+    }
+
+    @SuppressWarnings("SameParameterValue")
+    private void sendTestTaskAndWait(final String prefix, final Node node, final int code) throws InterruptedException {
+        sendTestTaskAndWait(prefix, node, 10, code);
+    }
+
+    @SuppressWarnings("SameParameterValue")
+    private void sendTestTaskAndWait(final String prefix, final Node node, int amount,
+                                     final int code) throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(10);
+        for (int i = 0; i < amount; i++) {
+            final ByteBuffer data = ByteBuffer.wrap((prefix + i).getBytes());
+            final Task task = new Task(data, new ExpectClosure(code, null, latch));
+            node.apply(task);
+        }
+        waitLatch(latch);
+    }
+
+    private void triggerLeaderSnapshot(TestCluster cluster, Node leader) throws InterruptedException {
+        triggerLeaderSnapshot(cluster, leader, 1);
+    }
+
+    private void triggerLeaderSnapshot(TestCluster cluster, Node leader, int times)
+        throws InterruptedException {
+        // trigger leader snapshot
+        // first snapshot will be triggered randomly
+        int snapshotTimes = cluster.getLeaderFsm().getSaveSnapshotTimes();
+        assertTrue(snapshotTimes == times - 1 || snapshotTimes == times, "snapshotTimes=" + snapshotTimes + ", times=" + times);
+        CountDownLatch latch = new CountDownLatch(1);
+        leader.snapshot(new ExpectClosure(latch));
+        waitLatch(latch);
+        assertEquals(snapshotTimes + 1, cluster.getLeaderFsm().getSaveSnapshotTimes());
+    }
+
+    private void waitLatch(final CountDownLatch latch) throws InterruptedException {
+        assertTrue(latch.await(30, TimeUnit.SECONDS));
+    }
+
+    @SuppressWarnings({"unused", "SameParameterValue"})
+    private boolean assertReadIndex(Node node, int index) throws InterruptedException {
+        CountDownLatch latch = new CountDownLatch(1);
+        byte[] requestContext = TestUtils.getRandomBytes();
+        AtomicBoolean success = new AtomicBoolean(false);
+        node.readIndex(requestContext, new ReadIndexClosure() {
+
+            @Override
+            public void run(Status status, long theIndex, byte[] reqCtx) {
+                if (status.isOk()) {
+                    assertEquals(index, theIndex);
+                    assertArrayEquals(requestContext, reqCtx);
+                    success.set(true);
+                }
+                else {
+                    assertTrue(status.getErrorMsg().contains("RPC exception:Check connection["), status.getErrorMsg());
+                    assertTrue(status.getErrorMsg().contains("] fail and try to create new one"), status.getErrorMsg());
+                }
+                latch.countDown();
+            }
+        });
+        latch.await();
+        return success.get();
+    }
 }
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
index c07a815..287a1d7 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
 import org.apache.ignite.raft.client.service.CommandClosure;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
 import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.raft.jraft.util.Utils;
 import org.junit.jupiter.api.AfterEach;
@@ -43,6 +44,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 
+import static org.apache.ignite.raft.jraft.core.State.STATE_ERROR;
+import static org.apache.ignite.raft.jraft.core.State.STATE_LEADER;
 import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
 import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
 import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
@@ -160,7 +163,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
 
         servers.add(server);
 
-        assertTrue(waitForTopology(service, servers.size(), 5_000));
+        assertTrue(waitForTopology(service, servers.size(), 15_000));
 
         return server;
     }
@@ -386,6 +389,11 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
         client1.refreshLeader().get();
         client2.refreshLeader().get();
 
+        NodeImpl leader = servers.stream().map(s -> ((NodeImpl) s.raftGroupService(COUNTER_GROUP_0).getRaftNode())).
+            filter(n -> n.getState() == STATE_LEADER).findFirst().orElse(null);
+
+        assertNotNull(leader);
+
         long val1 = applyIncrements(client1, 1, 5);
         long val2 = applyIncrements(client2, 1, 7);
 
@@ -407,11 +415,20 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
             assertTrue(cause instanceof RaftException);
         }
 
+        NodeImpl finalLeader = leader;
+        waitForCondition(() -> finalLeader.getState() == STATE_ERROR, 5_000);
+
+        // Client can't switch to new leader, because only one peer in the list.
         try {
             client1.<Long>run(new IncrementAndGetCommand(11)).get();
         }
         catch (Exception e) {
-            assertTrue(e.getCause() instanceof TimeoutException, "New leader should not get elected");
+            boolean isValid = e.getCause() instanceof TimeoutException;
+
+            if (!isValid)
+                LOG.error("Got unexpected exception", e);
+
+            assertTrue(isValid, "Expecting the timeout");
         }
     }
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
index e29af9b..db9a3e8 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
@@ -173,7 +173,7 @@ public class JRaftServerImpl implements RaftServer {
         final RaftGroupService server = new RaftGroupService(groupId, new PeerId(endpoint, 0,
             ElectionPriority.DISABLED), nodeOptions, rpcServer, nodeManager, true);
 
-        server.start(false);
+        server.start();
 
         groups.put(groupId, server);
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
index 1139159..08bcb3e 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.function.BiConsumer;
 import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.raft.client.Command;
 import org.apache.ignite.raft.client.Peer;
@@ -90,11 +89,11 @@ public class RaftServerImpl implements RaftServer {
         readQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
         writeQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
 
-        service.messagingService().addMessageHandler((message, sender, correlationId) -> {
+        service.messagingService().addMessageHandler((message, senderAddr, correlationId) -> {
             if (message instanceof GetLeaderRequest) {
                 GetLeaderResponse resp = clientMsgFactory.getLeaderResponse().leader(new Peer(service.topologyService().localMember().address())).build();
 
-                service.messagingService().send(sender, resp, correlationId);
+                service.messagingService().send(senderAddr, resp, correlationId);
             }
             else if (message instanceof ActionRequest) {
                 ActionRequest req0 = (ActionRequest)message;
@@ -102,15 +101,15 @@ public class RaftServerImpl implements RaftServer {
                 RaftGroupListener lsnr = listeners.get(req0.groupId());
 
                 if (lsnr == null) {
-                    sendError(sender, correlationId, RaftErrorCode.ILLEGAL_STATE);
+                    sendError(senderAddr, correlationId, RaftErrorCode.ILLEGAL_STATE);
 
                     return;
                 }
 
                 if (req0.command() instanceof ReadCommand)
-                    handleActionRequest(sender, req0, correlationId, readQueue, lsnr);
+                    handleActionRequest(senderAddr, req0, correlationId, readQueue, lsnr);
                 else
-                    handleActionRequest(sender, req0, correlationId, writeQueue, lsnr);
+                    handleActionRequest(senderAddr, req0, correlationId, writeQueue, lsnr);
             }
             // TODO https://issues.apache.org/jira/browse/IGNITE-14775
         });
@@ -172,7 +171,7 @@ public class RaftServerImpl implements RaftServer {
      * @param <T> Command type.
      */
     private <T extends Command> void handleActionRequest(
-        ClusterNode sender,
+        String sender,
         ActionRequest req,
         String corellationId,
         BlockingQueue<CommandClosureEx<T>> queue,
@@ -223,7 +222,7 @@ public class RaftServerImpl implements RaftServer {
         }
     }
 
-    private void sendError(ClusterNode sender, String corellationId, RaftErrorCode errorCode) {
+    private void sendError(String sender, String corellationId, RaftErrorCode errorCode) {
         RaftErrorResponse resp = clientMsgFactory.raftErrorResponse().errorCode(errorCode).build();
 
         service.messagingService().send(sender, resp, corellationId);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
index 846860e..e3c06de 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
@@ -110,15 +110,6 @@ public class RaftGroupService {
      * Starts the raft group service, returns the raft node.
      */
     public synchronized Node start() {
-        return start(true);
-    }
-
-    /**
-     * Starts the raft group service, returns the raft node.
-     *
-     * @param startRpcServer whether to start RPC server.
-     */
-    public synchronized Node start(final boolean startRpcServer) {
         if (this.started) {
             return this.node;
         }
@@ -132,6 +123,14 @@ public class RaftGroupService {
 
         assert this.nodeOptions.getRpcClient() != null;
 
+        // Should start RPC server before node initialization to avoid race.
+        if (!sharedRpcServer) {
+            this.rpcServer.init(null);
+        }
+        else {
+            LOG.info("RPC server is shared by RaftGroupService.");
+        }
+
         this.node = new NodeImpl(groupId, serverId);
 
         if (!this.node.init(this.nodeOptions)) {
@@ -148,13 +147,6 @@ public class RaftGroupService {
             throw new IgniteInternalException("Fail to init node, please see the logs to find the reason.");
         }
 
-        if (startRpcServer) {
-            this.rpcServer.init(null);
-        }
-        else {
-            LOG.warn("RPC server is not started in RaftGroupService.");
-        }
-
         this.nodeManager.add(this.node);
         this.started = true;
         LOG.info("Start the RaftGroupService successfully {}", this.node.getNodeId());
@@ -163,14 +155,9 @@ public class RaftGroupService {
 
     public synchronized void shutdown() {
         // TODO asch remove handlers before shutting down raft node https://issues.apache.org/jira/browse/IGNITE-14519
-        if (!this.started) {
-            return;
-        }
-        if (this.rpcServer != null) {
+        if (this.rpcServer != null && !this.sharedRpcServer) {
             try {
-                if (!this.sharedRpcServer) {
-                    this.rpcServer.shutdown();
-                }
+                this.rpcServer.shutdown();
             }
             catch (Exception e) {
                 LOG.error("Failed to shutdown the server", e);
@@ -178,6 +165,10 @@ public class RaftGroupService {
             this.rpcServer = null;
         }
 
+        if (!this.started) {
+            return;
+        }
+
         this.node.shutdown();
         try {
             this.node.join();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 5855826..38ff515 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -1930,8 +1930,8 @@ public class NodeImpl implements Node, RaftServerService {
             if (localPrevLogTerm != prevLogTerm) {
                 final long lastLogIndex = this.logManager.getLastLogIndex();
 
-                LOG.warn(
-                    "Node {} reject term_unmatched AppendEntriesRequest from {}, term={}, prevLogIndex={}, prevLogTerm={}, localPrevLogTerm={}, lastLogIndex={}, entriesSize={}.",
+                LOG.warn("Node {} reject term_unmatched AppendEntriesRequest from {}, term={}, prevLogIndex={}, " +
+                        "prevLogTerm={}, localPrevLogTerm={}, lastLogIndex={}, entriesSize={}.",
                     getNodeId(), request.getServerId(), request.getTerm(), prevLogIndex, prevLogTerm, localPrevLogTerm,
                     lastLogIndex, entriesCount);
 
@@ -3508,6 +3508,13 @@ public class NodeImpl implements Node, RaftServerService {
         }
     }
 
+    /**
+     * @return The state.
+     */
+    public State getState() {
+        return state;
+    }
+
     @Override
     public String toString() {
         return "JRaftNode [nodeId=" + getNodeId() + "]";
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RemotingException.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RemotingException.java
index a5861ea..0901225 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RemotingException.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RemotingException.java
@@ -20,7 +20,6 @@ package org.apache.ignite.raft.jraft.error;
  * Exception for default remoting problems.
  */
 public class RemotingException extends Exception {
-
     private static final long serialVersionUID = -6326244159775972292L;
 
     public RemotingException() {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
index 5e473c8..6251d8b 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
@@ -16,6 +16,7 @@
  */
 package org.apache.ignite.raft.jraft.rpc;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.network.TopologyEventHandler;
 import org.apache.ignite.raft.jraft.Lifecycle;
 import org.apache.ignite.raft.jraft.error.RemotingException;
@@ -28,10 +29,11 @@ import org.jetbrains.annotations.Nullable;
  */
 public interface RpcClient extends Lifecycle<RpcOptions> {
     /**
-     * Check connection for given address. // TODO asch rename to isAlive.
+     * Check connection for given address.
      *
      * @param endpoint target address
      * @return true if there is a connection and the connection is active and writable.
+     * @deprecated // TODO asch remove IGNITE-14832
      */
     boolean checkConnection(Endpoint endpoint);
 
@@ -43,44 +45,6 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
     void registerConnectEventListener(TopologyEventHandler handler);
 
     /**
-     * Synchronous invocation.
-     *
-     * @param endpoint target address
-     * @param request request object
-     * @param timeoutMs timeout millisecond
-     * @return invoke result
-     */
-    default Object invokeSync(Endpoint endpoint, Object request, long timeoutMs)
-        throws InterruptedException, RemotingException {
-        return invokeSync(endpoint, request, null, timeoutMs);
-    }
-
-    /**
-     * Synchronous invocation using a invoke context.
-     *
-     * @param endpoint target address
-     * @param request request object
-     * @param ctx invoke context
-     * @param timeoutMs timeout millisecond
-     * @return invoke result
-     */
-    Object invokeSync(Endpoint endpoint, Object request, @Nullable InvokeContext ctx,
-        long timeoutMs) throws InterruptedException, RemotingException;
-
-    /**
-     * Asynchronous invocation with a callback.
-     *
-     * @param endpoint target address
-     * @param request request object
-     * @param callback invoke callback
-     * @param timeoutMs timeout millisecond
-     */
-    default void invokeAsync(Endpoint endpoint, Object request, InvokeCallback callback,
-        long timeoutMs) throws InterruptedException, RemotingException {
-        invokeAsync(endpoint, request, null, callback, timeoutMs);
-    }
-
-    /**
      * Asynchronous invocation with a callback.
      *
      * @param endpoint target address
@@ -88,8 +52,14 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
      * @param ctx invoke context
      * @param callback invoke callback
      * @param timeoutMs timeout millisecond
+     *
+     * @return The future.
      */
-    void invokeAsync(Endpoint endpoint, Object request, @Nullable InvokeContext ctx,
+    CompletableFuture<Message> invokeAsync(
+        Endpoint endpoint,
+        Object request,
+        @Nullable InvokeContext ctx,
         InvokeCallback callback,
-        long timeoutMs) throws InterruptedException, RemotingException;
+        long timeoutMs
+    ) throws InterruptedException, RemotingException;
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestClosure.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestClosure.java
index 5199d8c..809423e 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestClosure.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestClosure.java
@@ -58,6 +58,7 @@ public class RpcRequestClosure implements Closure {
             LOG.warn("A response: {} sent repeatedly!", msg);
             return;
         }
+
         this.rpcCtx.sendResponse(msg);
     }
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestProcessor.java
index 112ab25..68d2d6f 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestProcessor.java
@@ -46,6 +46,7 @@ public abstract class RpcRequestProcessor<T extends Message> implements RpcProce
     public void handleRequest(final RpcContext rpcCtx, final T request) {
         try {
             final Message msg = processRequest(request, new RpcRequestClosure(rpcCtx, this.defaultResp));
+
             if (msg != null) {
                 rpcCtx.sendResponse(msg);
             }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcResponseClosureAdapter.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcResponseClosureAdapter.java
index 086c9cd..da6581a 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcResponseClosureAdapter.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcResponseClosureAdapter.java
@@ -19,13 +19,9 @@ package org.apache.ignite.raft.jraft.rpc;
 /**
  * RpcResponseClosure adapter holds the response.
  *
- *
- * 2018-Mar-29 2:30:35 PM
- *
  * @param <T>
  */
 public abstract class RpcResponseClosureAdapter<T extends Message> implements RpcResponseClosure<T> {
-
     private T resp;
 
     public T getResponse() {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
index 727f886..21dc6c8 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
@@ -18,6 +18,7 @@ package org.apache.ignite.raft.jraft.rpc.impl;
 
 import java.net.ConnectException;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -39,6 +40,7 @@ import org.apache.ignite.raft.jraft.rpc.RpcResponseClosure;
 import org.apache.ignite.raft.jraft.util.Endpoint;
 import org.apache.ignite.raft.jraft.util.Utils;
 import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.raft.jraft.util.internal.ThrowUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,7 +57,7 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
     /**
      * The set of pinged addresses
      */
-    private Set<String> readyAddresses = new ConcurrentHashSet<>();
+    protected Set<String> readyAddresses = new ConcurrentHashSet<>();
 
     public RpcClient getRpcClient() {
         return this.rpcClient;
@@ -120,24 +122,28 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
                 return true;
 
             try {
-                final RpcRequests.PingRequest req = RpcRequests.PingRequest.newBuilder() //
-                    .setSendTimestamp(System.currentTimeMillis()) //
+                final RpcRequests.PingRequest req = RpcRequests.PingRequest.newBuilder()
+                    .setSendTimestamp(System.currentTimeMillis())
                     .build();
 
-                final ErrorResponse resp = (ErrorResponse) rc.invokeSync(endpoint, req,
-                    this.rpcOptions.getRpcConnectTimeoutMs());
+                Future<Message> fut =
+                    invokeWithDone(endpoint, req, null, null, rpcOptions.getRpcConnectTimeoutMs(), rpcExecutor);
 
-                if (resp.getErrorCode() == 0) {
+                final ErrorResponse resp = (ErrorResponse) fut.get(); // Future will be certainly terminated by timeout.
+
+                if (resp != null && resp.getErrorCode() == 0) {
                     readyAddresses.add(endpoint.toString());
 
                     return true;
                 }
+                else
+                    return false;
             }
             catch (final InterruptedException e) {
                 Thread.currentThread().interrupt();
             }
-            catch (final RemotingException e) {
-                LOG.error("Fail to connect {}, remoting exception: {}.", endpoint, e.getMessage());
+            catch (final ExecutionException e) {
+                LOG.error("Fail to connect {}, exception: {}.", endpoint, e.getMessage());
             }
         }
 
@@ -169,6 +175,7 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
         final RpcClient rc = this.rpcClient;
         final FutureImpl<Message> future = new FutureImpl<>();
         final Executor currExecutor = rpcExecutor != null ? rpcExecutor : this.rpcExecutor;
+
         try {
             if (rc == null) {
                 // TODO asch replace with ignite exception, check all places IGNITE-14832
@@ -179,14 +186,9 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
                 return future;
             }
 
-            rc.invokeAsync(endpoint, request, ctx, new InvokeCallback() {
+            return rc.invokeAsync(endpoint, request, ctx, new InvokeCallback() {
                 @Override
                 public void complete(final Object result, final Throwable err) {
-                    if (future.isCancelled()) {
-                        onCanceled(request, done);
-                        return;
-                    }
-
                     if (err == null) {
                         Status status = Status.OK();
                         Message msg;
@@ -213,8 +215,8 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
                         }
                     }
                     else {
-                        if (err instanceof ConnectException)
-                            readyAddresses.remove(endpoint); // Force logical reconnect.
+                        if (ThrowUtil.hasCause(err, null, ConnectException.class))
+                            readyAddresses.remove(endpoint.toString()); // Force logical reconnect.
 
                         if (done != null) {
                             try {
@@ -249,7 +251,6 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
             // should be in another thread to avoid dead locking.
             Utils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTERNAL,
                 "Fail to send a RPC request:" + e.getMessage()));
-
         }
 
         return future;
@@ -261,16 +262,4 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
         status.setErrorMsg(eResp.getErrorMsg());
         return status;
     }
-
-    private <T extends Message> void onCanceled(final Message request, final RpcResponseClosure<T> done) {
-        if (done != null) {
-            try {
-                done.run(new Status(RaftError.ECANCELED, "RPC request was canceled by future."));
-            }
-            catch (final Throwable t) {
-                LOG.error("Fail to run RpcResponseClosure, the request is {}.", request, t);
-            }
-        }
-    }
-
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
index 6d23508..1cd2909 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
@@ -33,6 +33,7 @@ import org.apache.ignite.raft.jraft.error.RemotingException;
 import org.apache.ignite.raft.jraft.option.RpcOptions;
 import org.apache.ignite.raft.jraft.rpc.InvokeCallback;
 import org.apache.ignite.raft.jraft.rpc.InvokeContext;
+import org.apache.ignite.raft.jraft.rpc.Message;
 import org.apache.ignite.raft.jraft.rpc.RpcClientEx;
 import org.apache.ignite.raft.jraft.util.Endpoint;
 import org.apache.ignite.raft.jraft.util.Utils;
@@ -68,52 +69,14 @@ public class IgniteRpcClient implements RpcClientEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Object invokeSync(Endpoint endpoint, Object request, InvokeContext ctx,
-        long timeoutMs) throws InterruptedException, RemotingException {
-        if (!checkConnection(endpoint))
-            throw new RemotingException("Server is dead " + endpoint);
-
-        CompletableFuture<Object> fut = new CompletableFuture();
-
-        // Future hashcode used as corellation id.
-        if (recordPred != null && recordPred.test(request, endpoint.toString()))
-            recordedMsgs.add(new Object[] {request, endpoint.toString(), fut.hashCode(), System.currentTimeMillis(), null});
-
-        boolean wasBlocked;
-
-        synchronized (this) {
-            wasBlocked = blockPred != null && blockPred.test(request, endpoint.toString());
-
-            if (wasBlocked)
-                blockedMsgs.add(new Object[] {request, endpoint.toString(), fut.hashCode(), System.currentTimeMillis(), (Runnable) () -> send(endpoint, request, fut, timeoutMs)});
-        }
-
-        if (!wasBlocked)
-            send(endpoint, request, fut, timeoutMs);
-
-        try {
-            return fut.whenComplete((res, err) -> {
-                assert !(res == null && err == null) : res + " " + err;
-
-                if (err == null && recordPred != null && recordPred.test(res, this.toString()))
-                    recordedMsgs.add(new Object[] {res, this.toString(), fut.hashCode(), System.currentTimeMillis(), null});
-            }).get(timeoutMs, TimeUnit.MILLISECONDS);
-        }
-        catch (ExecutionException e) {
-            throw new RemotingException(e);
-        }
-        catch (TimeoutException e) {
-            throw new InvokeTimeoutException();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void invokeAsync(Endpoint endpoint, Object request, InvokeContext ctx, InvokeCallback callback,
-        long timeoutMs) throws InterruptedException, RemotingException {
-        if (!checkConnection(endpoint))
-            throw new RemotingException("Server is dead " + endpoint);
-
-        CompletableFuture<Object> fut = new CompletableFuture<>();
+    @Override public CompletableFuture<Message> invokeAsync(
+        Endpoint endpoint,
+        Object request,
+        InvokeContext ctx,
+        InvokeCallback callback,
+        long timeoutMs
+    ) throws InterruptedException, RemotingException {
+        CompletableFuture<Message> fut = new CompletableFuture<>();
 
         fut.orTimeout(timeoutMs, TimeUnit.MILLISECONDS).
             whenComplete((res, err) -> {
@@ -146,14 +109,16 @@ public class IgniteRpcClient implements RpcClientEx {
                     }
                 }});
 
-                return;
+                return fut;
             }
         }
 
         send(endpoint, request, fut, timeoutMs);
+
+        return fut;
     }
 
-    public void send(Endpoint endpoint, Object request, CompletableFuture<Object> fut, long timeout) {
+    public void send(Endpoint endpoint, Object request, CompletableFuture<Message> fut, long timeout) {
         CompletableFuture<NetworkMessage> fut0 = service.messagingService().invoke(endpoint.toString(), (NetworkMessage) request, timeout);
 
         fut0.whenComplete(new BiConsumer<NetworkMessage, Throwable>() {
@@ -161,7 +126,7 @@ public class IgniteRpcClient implements RpcClientEx {
                 if (err != null)
                     fut.completeExceptionally(err);
                 else
-                    fut.complete(resp);
+                    fut.complete((Message) resp);
             }
         });
     }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 31404a1..5492264 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -84,7 +84,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
         registerProcessor(new GetFileRequestProcessor(rpcExecutor));
         registerProcessor(new InstallSnapshotRequestProcessor(rpcExecutor));
         registerProcessor(new RequestVoteRequestProcessor(rpcExecutor));
-        registerProcessor(new PingRequestProcessor(rpcExecutor));
+        registerProcessor(new PingRequestProcessor(rpcExecutor)); // TODO asch this should go last.
         registerProcessor(new TimeoutNowRequestProcessor(rpcExecutor));
         registerProcessor(new ReadIndexRequestProcessor(rpcExecutor));
         // raft native cli service
@@ -104,7 +104,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
         registerProcessor(new ActionRequestProcessor(rpcExecutor, FACTORY));
         registerProcessor(new org.apache.ignite.raft.jraft.rpc.impl.client.SnapshotRequestProcessor(rpcExecutor, FACTORY));
 
-        service.messagingService().addMessageHandler((msg, sender, corellationId) -> {
+        service.messagingService().addMessageHandler((msg, senderAddr, corellationId) -> {
             Class<? extends NetworkMessage> cls = msg.getClass();
             RpcProcessor<NetworkMessage> prc = processors.get(cls.getName());
 
@@ -143,11 +143,11 @@ public class IgniteRpcServer implements RpcServer<Void> {
                     }
 
                     @Override public void sendResponse(Object responseObj) {
-                        service.messagingService().send(sender, (NetworkMessage) responseObj, corellationId);
+                        service.messagingService().send(senderAddr, (NetworkMessage) responseObj, corellationId);
                     }
 
                     @Override public String getRemoteAddress() {
-                        return sender.address();
+                        return senderAddr;
                     }
 
                     @Override public String getLocalAddress() {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
index add5d94..f0f88e7 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
@@ -394,7 +394,7 @@ public class AppendEntriesRequestProcessor extends NodeRequestProcessor<AppendEn
     }
 
     private int getAndIncrementSequence(final String groupId, final PeerPair pair, NodeManager nodeManager) {
-        // TODO asch can use getPeerContext because it must already present (created before) ???
+        // TODO asch can use getPeerContext because it must already present (created before) ??? IGNITE-14832
         return getOrCreatePeerRequestContext(groupId, pair, nodeManager).getAndIncrementSequence();
     }
 
@@ -416,11 +416,14 @@ public class AppendEntriesRequestProcessor extends NodeRequestProcessor<AppendEn
 
             boolean isHeartbeat = isHeartbeatRequest(request);
             int reqSequence = -1;
+
             if (!isHeartbeat) {
                 reqSequence = getAndIncrementSequence(groupId, pair, done.getRpcCtx().getNodeManager());
             }
+
             final Message response = service.handleAppendEntriesRequest(request, new SequenceRpcRequestClosure(done,
                 defaultResp(), groupId, pair, reqSequence, isHeartbeat));
+
             if (response != null) {
                 // heartbeat or probe request
                 if (isHeartbeat) {
@@ -430,6 +433,7 @@ public class AppendEntriesRequestProcessor extends NodeRequestProcessor<AppendEn
                     sendSequenceResponse(groupId, pair, reqSequence, done.getRpcCtx(), response);
                 }
             }
+
             return null;
         }
         else {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/internal/ThrowUtil.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/internal/ThrowUtil.java
index ea599ac..6b1b4e0 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/internal/ThrowUtil.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/internal/ThrowUtil.java
@@ -16,24 +16,16 @@
  */
 package org.apache.ignite.raft.jraft.util.internal;
 
+import org.jetbrains.annotations.Nullable;
+
 /**
  * Throwing tool.
  */
 public final class ThrowUtil {
-
-    private static final ReferenceFieldUpdater<Throwable, Throwable> causeUpdater = Updaters.newReferenceFieldUpdater(
-        Throwable.class, "cause");
-
     /**
      * Raises an exception bypassing compiler checks for checked exceptions.
      */
     public static void throwException(final Throwable t) {
-//        if (UnsafeUtil.hasUnsafe()) {
-//            UnsafeUtil.throwException(t);
-//        } else {
-//            ThrowUtil.throwException0(t);
-//        }
-
         ThrowUtil.throwException0(t);
     }
 
@@ -46,17 +38,47 @@ public final class ThrowUtil {
         throw (E) t;
     }
 
-    public static <T extends Throwable> T cutCause(final T cause) {
-        Throwable rootCause = cause;
-        while (rootCause.getCause() != null) {
-            rootCause = rootCause.getCause();
-        }
+    /**
+     * Checks if passed in {@code 'Throwable'} has given class in {@code 'cause'} hierarchy
+     * <b>including</b> that throwable itself.
+     * <p>
+     * Note that this method follows includes {@link Throwable#getSuppressed()}
+     * into check.
+     *
+     * @param t Throwable to check (if {@code null}, {@code false} is returned).
+     * @param msg Message text that should be in cause.
+     * @param cls Cause classes to check (if {@code null} or empty, {@code false} is returned).
+     * @return {@code True} if one of the causing exception is an instance of passed in classes,
+     *      {@code false} otherwise.
+     */
+    public static boolean hasCause(@Nullable Throwable t, @Nullable String msg, Class<?> @Nullable... cls) {
+        if (t == null || cls == null || cls.length == 0)
+            return false;
+
+        for (Throwable th = t; th != null; th = th.getCause()) {
+            for (Class<?> c : cls) {
+                if (c.isAssignableFrom(th.getClass())) {
+                    if (msg != null) {
+                        if (th.getMessage() != null && th.getMessage().contains(msg))
+                            return true;
+                        else
+                            continue;
+                    }
+
+                    return true;
+                }
+            }
 
-        if (rootCause != cause) {
-            cause.setStackTrace(rootCause.getStackTrace());
-            causeUpdater.set(cause, cause);
+            for (Throwable n : th.getSuppressed()) {
+                if (hasCause(n, msg, cls))
+                    return true;
+            }
+
+            if (th.getCause() == th)
+                break;
         }
-        return cause;
+
+        return false;
     }
 
     private ThrowUtil() {
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 92fe82a..6dd2f90 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.ClusterLocalConfiguration;
@@ -51,14 +52,20 @@ import org.apache.ignite.raft.jraft.util.Endpoint;
 import org.apache.ignite.raft.jraft.util.Utils;
 import org.jetbrains.annotations.Nullable;
 
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 /**
  * Test cluster for NodeTest
  */
 public class TestCluster {
-    /** Default election timeout. */
-    private static final int ELECTION_TIMEOUT = 300;
+    /**
+     * Default election timeout.
+     * Important: due to sync disk ops (writing raft meta) during probe request processing this timeout should be high
+     * enough to avoid test flakiness.
+     */
+    private static final int ELECTION_TIMEOUT_MILLIS = 600;
 
     private static final IgniteLogger LOG = IgniteLogger.forClass(TestCluster.class);
 
@@ -97,17 +104,17 @@ public class TestCluster {
     }
 
     public TestCluster(final String name, final String dataPath, final List<PeerId> peers) {
-        this(name, dataPath, peers, ELECTION_TIMEOUT);
+        this(name, dataPath, peers, ELECTION_TIMEOUT_MILLIS);
     }
 
     public TestCluster(final String name, final String dataPath, final List<PeerId> peers,
         final int electionTimeoutMs) {
-        this(name, dataPath, peers, new LinkedHashSet<>(), ELECTION_TIMEOUT, null);
+        this(name, dataPath, peers, new LinkedHashSet<>(), ELECTION_TIMEOUT_MILLIS, null);
     }
 
     public TestCluster(final String name, final String dataPath, final List<PeerId> peers,
         final LinkedHashSet<PeerId> learners, final int electionTimeoutMs) {
-        this(name, dataPath, peers, learners, ELECTION_TIMEOUT, null);
+        this(name, dataPath, peers, learners, ELECTION_TIMEOUT_MILLIS, null);
     }
 
     public TestCluster(final String name, final String dataPath, final List<PeerId> peers,
@@ -210,7 +217,7 @@ public class TestCluster {
 
             nodeOptions.setRpcClient(rpcClient);
 
-            var rpcServer = new TestIgniteRpcServer(clusterService, servers, nodeManager);
+            var rpcServer = new TestIgniteRpcServer(clusterService, servers, nodeManager, nodeOptions);
 
             clusterService.start();
 
@@ -306,7 +313,7 @@ public class TestCluster {
 
     public void clean(final Endpoint listenAddr) throws IOException {
         final String path = this.dataPath + File.separator + listenAddr.toString().replace(':', '_');
-        LOG.info("Clean dir: {0}", path);
+        LOG.info("Clean dir: {}", path);
         Utils.delete(new File(path));
     }
 
@@ -334,11 +341,18 @@ public class TestCluster {
         return null;
     }
 
+    /**
+     * Wait until a leader is elected.
+     * @throws InterruptedException
+     */
     public void waitLeader() throws InterruptedException {
+        Node node;
+
         while (true) {
-            final Node node = getLeader();
+            node = getLeader();
+
             if (node != null) {
-                return;
+                break;
             }
             else {
                 Thread.sleep(10);
@@ -363,25 +377,38 @@ public class TestCluster {
     }
 
     /**
-     * Ensure all peers leader is expectAddr
+     * Ensure all peers follow the leader
      *
-     * @param expectAddr expected address
+     * @param node The leader.
      * @throws InterruptedException if interrupted
      */
-    public void ensureLeader(final Endpoint expectAddr) throws InterruptedException {
+    public void ensureLeader(final Node node) throws InterruptedException {
         while (true) {
             this.lock.lock();
-            for (final Node node : this.nodes) {
-                final PeerId leaderId = node.getLeaderId();
-                if (!leaderId.getEndpoint().equals(expectAddr)) {
-                    this.lock.unlock();
+            try {
+                boolean wait = false;
+
+                for (final Node node0 : this.nodes) {
+                    final PeerId leaderId = node0.getLeaderId();
+
+                    if (leaderId == null || !leaderId.equals(node.getNodeId().getPeerId())) {
+                        wait = true;
+
+                        break;
+                    }
+                }
+
+                if (wait) {
                     Thread.sleep(10);
+
                     continue;
                 }
+                else
+                    return;
+            }
+            finally {
+                this.lock.unlock();
             }
-            // all is ready
-            this.lock.unlock();
-            return;
         }
     }
 
@@ -424,11 +451,16 @@ public class TestCluster {
         return ret;
     }
 
+    public void ensureSame() throws InterruptedException {
+        ensureSame(addr -> false);
+    }
+
     /**
+     * @param filter The node to exclude filter.
      * @return {@code True} if all FSM state are the same.
      * @throws InterruptedException
      */
-    public void ensureSame() throws InterruptedException {
+    public void ensureSame(Predicate<Endpoint> filter) throws InterruptedException {
         this.lock.lock();
 
         List<MockStateMachine> fsmList = new ArrayList<>(this.fsms.values());
@@ -439,18 +471,25 @@ public class TestCluster {
             return;
         }
 
-        LOG.info("Start ensureSame");
+        Node leader = getLeader();
+
+        assertNotNull(leader);
+
+        MockStateMachine first = fsms.get(leader.getNodeId().getPeerId());
+
+        LOG.info("Start ensureSame, leader={}", leader);
 
         try {
             assertTrue(TestUtils.waitForCondition(() -> {
-                MockStateMachine first = fsmList.get(0);
-
                 first.lock();
 
                 try {
-                    for (int i = 1; i < fsmList.size(); i++) {
+                    for (int i = 0; i < fsmList.size(); i++) {
                         MockStateMachine fsm = fsmList.get(i);
 
+                        if (fsm == first || filter.test(fsm.getAddress()))
+                            continue;
+
                         fsm.lock();
 
                         try {
@@ -482,7 +521,12 @@ public class TestCluster {
         }
         finally {
             this.lock.unlock();
-            LOG.info("End ensureSame");
+
+            Node leader1 = getLeader();
+
+            LOG.info("End ensureSame, leader={}", leader1);
+
+            assertSame("Leader shouldn't change while comparing fsms", leader, leader1);
         }
     }
 }
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractClientServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractClientServiceTest.java
index a722941..48d9d7f 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractClientServiceTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractClientServiceTest.java
@@ -16,8 +16,8 @@
  */
 package org.apache.ignite.raft.jraft.rpc;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import org.apache.ignite.raft.jraft.JRaftUtils;
 import org.apache.ignite.raft.jraft.Status;
@@ -42,10 +42,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.when;
 
 @RunWith(value = MockitoJUnitRunner.class)
 public class AbstractClientServiceTest {
@@ -57,16 +58,16 @@ public class AbstractClientServiceTest {
     private final Endpoint endpoint = new Endpoint("localhost", 8081);
 
     @Before
-    public void setup() {
+    public void setup() throws Exception {
         this.rpcOptions = new RpcOptions();
         this.rpcOptions.setClientExecutor(JRaftUtils.createClientExecutor(this.rpcOptions, "unittest"));
         this.clientService = new MockClientService();
+        when(this.rpcClient.invokeAsync(any(), any(), any(), any(), anyLong())).thenReturn(new CompletableFuture<>());
         this.rpcOptions.setRpcClient(this.rpcClient);
         assertTrue(this.clientService.init(this.rpcOptions));
     }
 
     static class MockRpcResponseClosure<T extends Message> extends RpcResponseClosureAdapter<T> {
-
         CountDownLatch latch = new CountDownLatch(1);
 
         Status status;
@@ -79,33 +80,6 @@ public class AbstractClientServiceTest {
     }
 
     @Test
-    public void testCancel() throws Exception {
-        ArgumentCaptor<InvokeCallback> callbackArg = ArgumentCaptor.forClass(InvokeCallback.class);
-        PingRequest request = TestUtils.createPingRequest();
-
-        MockRpcResponseClosure<ErrorResponse> done = new MockRpcResponseClosure<>();
-        Future<Message> future = this.clientService.invokeWithDone(this.endpoint, request, done, -1);
-        Mockito.verify(this.rpcClient).invokeAsync(eq(this.endpoint), eq(request), Mockito.any(),
-            callbackArg.capture(), eq((long) this.rpcOptions.getRpcDefaultTimeout()));
-        InvokeCallback cb = callbackArg.getValue();
-        assertNotNull(cb);
-        assertNotNull(future);
-
-        assertNull(done.getResponse());
-        assertNull(done.status);
-        assertFalse(future.isDone());
-
-        future.cancel(true);
-        ErrorResponse response = (ErrorResponse) this.rpcResponseFactory.newResponse(null, Status.OK());
-        cb.complete(response, null);
-
-        // The closure should be notified with ECANCELED error code.
-        done.latch.await();
-        assertNotNull(done.status);
-        assertEquals(RaftError.ECANCELED.getNumber(), done.status.getCode());
-    }
-
-    @Test
     public void testInvokeWithDoneOK() throws Exception {
         ArgumentCaptor<InvokeCallback> callbackArg = ArgumentCaptor.forClass(InvokeCallback.class);
         PingRequest request = TestUtils.createPingRequest();
@@ -125,11 +99,6 @@ public class AbstractClientServiceTest {
         ErrorResponse response = (ErrorResponse) this.rpcResponseFactory.newResponse(null, Status.OK());
         cb.complete(response, null);
 
-        Message msg = future.get();
-        assertNotNull(msg);
-        assertTrue(msg instanceof ErrorResponse);
-        assertSame(msg, response);
-
         done.latch.await();
         assertNotNull(done.status);
         assertEquals(0, done.status.getCode());
@@ -155,14 +124,6 @@ public class AbstractClientServiceTest {
 
         assertTrue(future.isDone());
 
-        try {
-            future.get();
-            fail();
-        }
-        catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof RemotingException);
-        }
-
         done.latch.await();
         assertNotNull(done.status);
         assertEquals(RaftError.EINTERNAL.getNumber(), done.status.getCode());
@@ -188,21 +149,13 @@ public class AbstractClientServiceTest {
 
         cb.complete(null, new InvokeTimeoutException());
 
-        try {
-            future.get();
-            fail();
-        }
-        catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof InvokeTimeoutException);
-        }
-
         done.latch.await();
         assertNotNull(done.status);
         assertEquals(RaftError.ETIMEDOUT.getNumber(), done.status.getCode());
     }
 
     @Test
-    public void testInvokeWithDOneOnErrorResponse() throws Exception {
+    public void testInvokeWithDoneOnErrorResponse() throws Exception {
         final InvokeContext invokeCtx = new InvokeContext();
         final ArgumentCaptor<InvokeCallback> callbackArg = ArgumentCaptor.forClass(InvokeCallback.class);
         final CliRequests.GetPeersRequest request = CliRequests.GetPeersRequest.newBuilder() //
@@ -226,11 +179,6 @@ public class AbstractClientServiceTest {
             new Status(-1, "failed"));
         cb.complete(resp, null);
 
-        final Message msg = future.get();
-
-        assertTrue(msg instanceof ErrorResponse);
-        assertEquals(((ErrorResponse) msg).getErrorMsg(), "failed");
-
         done.latch.await();
         assertNotNull(done.status);
         assertTrue(!done.status.isOk());
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
index 9948db1..be786ed 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
@@ -21,14 +21,10 @@ import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.raft.jraft.util.Endpoint;
-import org.apache.ignite.raft.jraft.util.Utils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -97,16 +93,6 @@ public abstract class AbstractRpcTest {
     }
 
     @Test
-    public void testSyncProcessing() throws Exception {
-        RpcClient client = createClient();
-        Response1 resp1 = (Response1) client.invokeSync(endpoint, new Request1(), new InvokeContext(), 5000);
-        assertNotNull(resp1);
-
-        Response2 resp2 = (Response2) client.invokeSync(endpoint, new Request2(), new InvokeContext(), 5000);
-        assertNotNull(resp2);
-    }
-
-    @Test
     public void testAsyncProcessing() throws Exception {
         RpcClient client = createClient();
 
@@ -147,52 +133,6 @@ public abstract class AbstractRpcTest {
     }
 
     @Test
-    public void testRecordedSync() throws Exception {
-        RpcClientEx client1 = (RpcClientEx) createClient();
-        client1.recordMessages((a, b) -> true);
-
-        assertTrue(client1.checkConnection(endpoint));
-
-        Response1 resp1 = (Response1) client1.invokeSync(endpoint, new Request1(), 500);
-        Response2 resp2 = (Response2) client1.invokeSync(endpoint, new Request2(), 500);
-
-        assertNotNull(resp1);
-        assertNotNull(resp2);
-
-        Queue<Object[]> recorded = client1.recordedMessages();
-
-        assertEquals(4, recorded.size());
-        assertTrue(recorded.poll()[0] instanceof Request1);
-        assertTrue(recorded.poll()[0] instanceof Response1);
-        assertTrue(recorded.poll()[0] instanceof Request2);
-        assertTrue(recorded.poll()[0] instanceof Response2);
-    }
-
-    @Test
-    public void testRecordedSyncTimeout() {
-        RpcClientEx client1 = (RpcClientEx) createClient();
-        client1.recordMessages((a, b) -> true);
-
-        assertTrue(client1.checkConnection(endpoint));
-
-        try {
-            Request1 request = new Request1();
-            request.val = 10_000;
-            client1.invokeSync(endpoint, request, 500);
-
-            fail();
-        }
-        catch (Exception ignored) {
-            // Expected.
-        }
-
-        Queue<Object[]> recorded = client1.recordedMessages();
-
-        assertEquals(1, recorded.size());
-        assertTrue(recorded.poll()[0] instanceof Request1);
-    }
-
-    @Test
     public void testRecordedAsync() throws Exception {
         RpcClientEx client1 = (RpcClientEx) createClient();
         client1.recordMessages((a, b) -> true);
@@ -245,35 +185,6 @@ public abstract class AbstractRpcTest {
     }
 
     @Test
-    public void testBlockedSync() throws Exception {
-        RpcClientEx client1 = (RpcClientEx) createClient();
-        client1.blockMessages((msg, id) -> msg instanceof Request1);
-
-        assertTrue(client1.checkConnection(endpoint));
-
-        Response2 resp2 = (Response2) client1.invokeSync(endpoint, new Request2(), 500);
-
-        assertEquals(1, resp2.val);
-
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-
-        Future<Response1> resp = Utils.runInThread(executorService,
-            () -> (Response1) client1.invokeSync(endpoint, new Request1(), 30_000));
-
-        Thread.sleep(500);
-
-        Queue<Object[]> msgs = client1.blockedMessages();
-
-        assertEquals(1, msgs.size());
-
-        assertFalse(resp.isDone());
-
-        client1.stopBlock();
-
-        resp.get(5_000, TimeUnit.MILLISECONDS);
-    }
-
-    @Test
     public void testBlockedAsync() throws Exception {
         RpcClientEx client1 = (RpcClientEx) createClient();
         client1.blockMessages((msg, id) -> msg instanceof Request1);
@@ -282,7 +193,7 @@ public abstract class AbstractRpcTest {
 
         CompletableFuture<Object> resp = new CompletableFuture<>();
 
-        client1.invokeAsync(endpoint, new Request1(), (result, err) -> resp.complete(result), 30_000);
+        client1.invokeAsync(endpoint, new Request1(), null, (result, err) -> resp.complete(result), 30_000);
 
         Thread.sleep(500);
 
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
index d5370b9..6c6abe4 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessageSerializationRegistryImpl;
 import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
 import org.apache.ignite.raft.jraft.NodeManager;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
 import org.apache.ignite.raft.jraft.util.Endpoint;
 
@@ -34,11 +35,10 @@ public class IgniteRpcTest extends AbstractRpcTest {
     /** The counter. */
     private final AtomicInteger cntr = new AtomicInteger();
 
-    /** {@inheritDoc} */
     @Override public RpcServer<?> createServer(Endpoint endpoint) {
         ClusterService service = createService(endpoint.toString(), endpoint.getPort(), List.of());
 
-        var server = new TestIgniteRpcServer(service, List.of(), new NodeManager()) {
+        var server = new TestIgniteRpcServer(service, List.of(), new NodeManager(), new NodeOptions()) {
             @Override public void shutdown() {
                 super.shutdown();
 
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
index 9052f10..2dd9fa0 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
@@ -30,16 +30,17 @@ import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
  */
 public class TestIgniteRpcServer extends IgniteRpcServer {
     /**
-     * @param clusterService cluster service
-     * @param servers server list
-     * @param nodeManager node manager
+     * @param clusterService Cluster service.
+     * @param servers Server list.
+     * @param nodeManager Node manager
+     * @param nodeOptions Node options.
      */
-    public TestIgniteRpcServer(ClusterService clusterService, List<String> servers, NodeManager nodeManager) {
+    public TestIgniteRpcServer(ClusterService clusterService, List<String> servers, NodeManager nodeManager, NodeOptions nodeOptions) {
         super(
             clusterService,
             nodeManager,
             new RaftClientMessagesFactory(),
-            JRaftUtils.createRequestExecutor(new NodeOptions())
+            JRaftUtils.createRequestExecutor(nodeOptions)
         );
     }
 }
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotReaderTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotReaderTest.java
index 5f7e05f..2299e01 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotReaderTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotReaderTest.java
@@ -37,7 +37,6 @@ import static org.junit.Assert.assertTrue;
 
 @RunWith(value = MockitoJUnitRunner.class)
 public class LocalSnapshotReaderTest extends BaseStorageTest {
-
     private LocalSnapshotReader reader;
     @Mock
     private LocalSnapshotStorage snapshotStorage;
@@ -48,13 +47,13 @@ public class LocalSnapshotReaderTest extends BaseStorageTest {
     @Before
     public void setup() throws Exception {
         super.setup();
-        this.path = this.path + File.separator + Snapshot.JRAFT_SNAPSHOT_PREFIX + snapshotIndex;
-        new File(path).mkdirs();
+        String snapPath = this.path + File.separator + Snapshot.JRAFT_SNAPSHOT_PREFIX + snapshotIndex;
+        new File(snapPath).mkdirs();
         this.table = new LocalSnapshotMetaTable(new RaftOptions());
         this.table.addFile("testFile", LocalFileMetaOutter.LocalFileMeta.newBuilder().setChecksum("test").build());
-        table.saveToFile(path + File.separator + Snapshot.JRAFT_SNAPSHOT_META_FILE);
+        table.saveToFile(snapPath + File.separator + Snapshot.JRAFT_SNAPSHOT_META_FILE);
         this.reader = new LocalSnapshotReader(snapshotStorage, null, new Endpoint("localhost", 8081),
-            new RaftOptions(), path);
+            new RaftOptions(), snapPath);
         assertTrue(this.reader.init(null));
     }