You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/03/01 17:06:09 UTC

[ignite-3] 02/02: IGNITE-14149 Migrate to ignite network.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch ignite-14149
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 399f640174b26a38e336024c212debf42b80ef01
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Mon Mar 1 20:05:49 2021 +0300

    IGNITE-14149 Migrate to ignite network.
---
 .../org/apache/ignite/network/NetworkCluster.java  |  15 ++-
 .../network/scalecube/ScaleCubeNetworkCluster.java |  19 +--
 modules/raft-client/pom.xml                        |   5 +
 .../org/apache/ignite/raft/ElectionPriority.java   |   2 +-
 .../main/java/org/apache/ignite/raft/PeerId.java   |  18 +--
 .../java/org/apache/ignite/raft/RaftException.java |  16 ---
 .../ignite/raft/client/RaftClientMessages.java     |  66 ++++------
 .../client/message/AddLearnersRequestImpl.java     |   2 +-
 .../raft/client/message/AddPeerRequestImpl.java    |   2 +-
 .../raft/client/message/ChangePeerRequestImpl.java |   2 +-
 .../raft/client/message/GetLeaderRequestImpl.java  |   2 +-
 .../raft/client/message/GetPeersRequestImpl.java   |   2 +-
 .../raft/client/message/PingRequestImpl.java       |  21 ----
 ...rFactory.java => RaftClientMessageFactory.java} |   6 +-
 ...tory.java => RaftClientMessageFactoryImpl.java} |  14 +--
 .../client/message/RemoveLearnersRequestImpl.java  |   2 +-
 .../raft/client/message/RemovePeerRequestImpl.java |   2 +-
 .../client/message/ResetLearnersRequestImpl.java   |   2 +-
 .../raft/client/message/ResetPeerRequestImpl.java  |   2 +-
 .../raft/client/message/SnapshotRequestImpl.java   |   2 +-
 .../raft/client/message/StatusResponseImpl.java    |  32 -----
 .../client/message/TransferLeaderRequestImpl.java  |   2 +-
 .../raft/client/message/UserRequestImpl.java       |  10 +-
 .../ignite/raft/client/rpc/RaftGroupRpcClient.java |  30 +++--
 .../client/rpc/impl/RaftGroupRpcClientImpl.java    | 135 +++++----------------
 .../service/RaftGroupClientRequestService.java     |   2 +-
 .../client/service/RaftGroupManagmentService.java  |   8 +-
 .../impl/RaftGroupClientRequestServiceImpl.java    |  12 +-
 .../impl/RaftGroupManagementServiceImpl.java       |   2 +-
 .../org/apache/ignite/raft/rpc/InvokeCallback.java |  24 ----
 .../java/org/apache/ignite/raft/rpc/Message.java   |   4 -
 .../main/java/org/apache/ignite/raft/rpc/Node.java |  10 --
 .../java/org/apache/ignite/raft/rpc/NodeImpl.java  |  49 --------
 .../apache/ignite/raft/rpc/RaftGroupMessage.java   |   8 --
 .../java/org/apache/ignite/raft/rpc/RpcClient.java |  59 ---------
 .../org/apache/ignite/raft/client/MockUtils.java   |  71 +++++++++++
 .../raft/client/rpc/RaftGroupRpcClientTest.java    | 118 ++++++------------
 .../service/RaftGroupClientRequestServiceTest.java | 104 +++-------------
 38 files changed, 257 insertions(+), 625 deletions(-)

diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
index 5bdd576..f271b53 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
@@ -17,6 +17,7 @@
 package org.apache.ignite.network;
 
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 
 /**
@@ -59,7 +60,19 @@ public interface NetworkCluster {
      * @param member Network member which should receive the message.
      * @param msg Message which should be delivered.
      */
-    Future<?> guaranteedSend(NetworkMember member, Object msg);
+    Future<?> send(NetworkMember member, Object msg);
+
+    /**
+     * Sends asynchronously a message with same guarantees as for {@link #send(NetworkMember, Object)} and
+     * returns a response (RPC style).
+     *
+     * @param member Network member which should receive the message.
+     * @param msg A message.
+     * @param timeout Waiting for response timeout in milliseconds.
+     * @param <R> Expected response type.
+     * @return A future holding the response or error if the expected response was not received.
+     */
+    <R> CompletableFuture<R> sendWithResponse(NetworkMember member, Object msg, long timeout);
 
     /**
      * Add provider which allows to get configured handlers for different cluster events(ex. received message).
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
index 0dc1087..f2ebbac 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
@@ -16,9 +16,12 @@
  */
 package org.apache.ignite.network.scalecube;
 
+import io.scalecube.cluster.transport.api.Message;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import io.scalecube.cluster.Cluster;
 import org.apache.ignite.network.NetworkCluster;
@@ -29,6 +32,7 @@ import org.apache.ignite.network.NetworkMessageHandler;
 import org.apache.ignite.network.MessageHandlerHolder;
 
 import static io.scalecube.cluster.transport.api.Message.fromData;
+import static java.time.Duration.ofMillis;
 
 /**
  * Implementation of {@link NetworkCluster} based on ScaleCube.
@@ -84,15 +88,14 @@ public class ScaleCubeNetworkCluster implements NetworkCluster {
     }
 
     /** {@inheritDoc} */
-    @Override public Future<?> guaranteedSend(NetworkMember member, Object msg) {
-        cluster.send(memberResolver.resolveMember(member), fromData(msg))
-            .block();
-
-        CompletableFuture<Object> future = new CompletableFuture<>();
-
-        future.complete(null);
+    @Override public Future<?> send(NetworkMember member, Object msg) {
+        return cluster.send(memberResolver.resolveMember(member), fromData(msg)).toFuture();
+    }
 
-        return future;
+    /** {@inheritDoc} */
+    @Override public <R> CompletableFuture<R> sendWithResponse(NetworkMember member, Object msg, long timeout) {
+        return cluster.requestResponse(memberResolver.resolveMember(member), fromData(msg))
+            .timeout(ofMillis(timeout)).toFuture().thenApply(m -> m.data());
     }
 
     /** {@inheritDoc} */
diff --git a/modules/raft-client/pom.xml b/modules/raft-client/pom.xml
index 6242962..06be309 100644
--- a/modules/raft-client/pom.xml
+++ b/modules/raft-client/pom.xml
@@ -36,6 +36,11 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-network</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-engine</artifactId>
             <scope>test</scope>
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java
index 376a589..8dd5ddf 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java
@@ -21,7 +21,7 @@ package org.apache.ignite.raft;
  */
 public class ElectionPriority {
     /**
-     * Priority -1 represents this node disabled the priority election function.
+     * Priority -1 means this node has disabled the priority election function.
      */
     public static final int DISABLED = -1;
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
index 7b5fec0..9587c87 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
@@ -17,21 +17,21 @@
 package org.apache.ignite.raft;
 
 import java.io.Serializable;
-import org.apache.ignite.raft.rpc.Node;
+import org.apache.ignite.network.NetworkMember;
 
 /**
  * Represents a participant in a replicating group.
  */
-public class PeerId implements Serializable {
+public final class PeerId implements Serializable {
     private static final long serialVersionUID = 8083529734784884641L;
 
     /**
-     * Owning node.
+     * Cluster node for peer.
      */
-    private final Node node;
+    private final NetworkMember node;
 
     /**
-     * Node's local priority value, if node don't support priority election, this value is -1.
+     * Peer's local priority value, if node don't support priority election, this value is -1.
      */
     private final int priority;
 
@@ -40,17 +40,17 @@ public class PeerId implements Serializable {
         this.priority = peer.getPriority();
     }
 
-    public PeerId(Node node) {
+    public PeerId(NetworkMember node) {
         this(node, ElectionPriority.DISABLED);
     }
 
-    public PeerId(final Node node, final int priority) {
+    public PeerId(final NetworkMember node, final int priority) {
         super();
         this.node = node;
         this.priority = priority;
     }
 
-    public Node getNode() {
+    public NetworkMember getNode() {
         return this.node;
     }
 
@@ -77,6 +77,6 @@ public class PeerId implements Serializable {
     }
 
     @Override public String toString() {
-        return node.id() + ":" + priority;
+        return node.name() + ":" + priority;
     }
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/RaftException.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/RaftException.java
deleted file mode 100644
index f63f6f0..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/RaftException.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.ignite.raft;
-
-/** */
-public class RaftException extends Exception {
-    private final int statusCode;
-
-    public RaftException(int statusCode, String statusMsg) {
-        super(statusMsg);
-
-        this.statusCode = statusCode;
-    }
-
-    public int getStatusCode() {
-        return statusCode;
-    }
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientMessages.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientMessages.java
index 3f44685..c809a0d 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientMessages.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientMessages.java
@@ -21,8 +21,6 @@ package org.apache.ignite.raft.client;
 
 import java.util.List;
 import org.apache.ignite.raft.PeerId;
-import org.apache.ignite.raft.rpc.Message;
-import org.apache.ignite.raft.rpc.RaftGroupMessage;
 
 /**
  *
@@ -31,21 +29,7 @@ public final class RaftClientMessages {
     private RaftClientMessages() {
     }
 
-    public interface StatusResponse extends Message {
-        int getStatusCode();
-
-        String getStatusMsg();
-
-        interface Builder {
-            Builder setStatusCode(int code);
-
-            Builder setStatusMsg(String msg);
-
-            StatusResponse build();
-        }
-    }
-
-    public interface PingRequest extends Message {
+    public interface PingRequest {
         long getSendTimestamp();
 
         interface Builder {
@@ -55,7 +39,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface AddPeerRequest extends RaftGroupMessage {
+    public interface AddPeerRequest {
         PeerId getPeerId();
 
         interface Builder {
@@ -67,7 +51,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface AddPeerResponse extends Message {
+    public interface AddPeerResponse {
         List<PeerId> getOldPeersList();
 
         List<PeerId> getNewPeersList();
@@ -81,7 +65,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface RemovePeerRequest extends RaftGroupMessage {
+    public interface RemovePeerRequest {
         PeerId getPeerId();
 
         interface Builder {
@@ -93,7 +77,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface RemovePeerResponse extends Message {
+    public interface RemovePeerResponse {
         List<PeerId> getOldPeersList();
 
         List<PeerId> getNewPeersList();
@@ -107,7 +91,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface ChangePeersRequest extends RaftGroupMessage {
+    public interface ChangePeersRequest {
         List<PeerId> getNewPeersList();
 
         public interface Builder {
@@ -119,7 +103,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface ChangePeersResponse extends Message {
+    public interface ChangePeersResponse {
         List<PeerId> getOldPeersList();
 
         List<PeerId> getNewPeersList();
@@ -133,7 +117,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface SnapshotRequest extends RaftGroupMessage {
+    public interface SnapshotRequest {
         public interface Builder {
             Builder setGroupId(String groupId);
 
@@ -141,7 +125,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface ResetPeerRequest extends RaftGroupMessage {
+    public interface ResetPeerRequest {
         List<PeerId> getNewPeersList();
 
         public interface Builder {
@@ -153,7 +137,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface TransferLeaderRequest extends RaftGroupMessage {
+    public interface TransferLeaderRequest {
         PeerId getPeerId();
 
         public interface Builder {
@@ -163,7 +147,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface GetLeaderRequest extends RaftGroupMessage {
+    public interface GetLeaderRequest {
         public interface Builder {
             Builder setGroupId(String groupId);
 
@@ -171,7 +155,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface GetLeaderResponse extends Message {
+    public interface GetLeaderResponse {
         PeerId getLeaderId();
 
         public interface Builder {
@@ -181,7 +165,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface GetPeersRequest extends RaftGroupMessage {
+    public interface GetPeersRequest {
         boolean getOnlyAlive();
 
         public interface Builder {
@@ -193,7 +177,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface GetPeersResponse extends Message {
+    public interface GetPeersResponse {
         List<PeerId> getPeersList();
 
         List<PeerId> getLearnersList();
@@ -207,7 +191,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface AddLearnersRequest extends RaftGroupMessage {
+    public interface AddLearnersRequest {
         List<PeerId> getLearnersList();
 
         public interface Builder {
@@ -219,7 +203,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface RemoveLearnersRequest extends RaftGroupMessage {
+    public interface RemoveLearnersRequest {
         List<PeerId> getLearnersList();
 
         public interface Builder {
@@ -231,7 +215,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface ResetLearnersRequest extends RaftGroupMessage {
+    public interface ResetLearnersRequest {
         List<PeerId> getLearnersList();
 
         public interface Builder {
@@ -243,7 +227,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface LearnersOpResponse extends Message {
+    public interface LearnersOpResponse  {
         List<PeerId> getOldLearnersList();
 
         List<PeerId> getNewLearnersList();
@@ -257,19 +241,21 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface UserRequest<T> extends RaftGroupMessage {
-        T request();
+    public interface UserRequest {
+        Object request();
 
-        public interface Builder<T> {
-            Builder setRequest(T request);
+        String getGroupId();
+
+        public interface Builder {
+            Builder setRequest(Object request);
 
             Builder setGroupId(String groupId);
 
-            UserRequest<T> build();
+            UserRequest build();
         }
     }
 
-    public interface UserResponse<T> extends Message {
+    public interface UserResponse<T> {
         T response();
 
         public interface Builder<T> {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequestImpl.java
index 2bb1125..dec9076 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequestImpl.java
@@ -10,7 +10,7 @@ public class AddLearnersRequestImpl implements RaftClientMessages.AddLearnersReq
     private PeerId leaderId;
     private List<PeerId> learnersList = new ArrayList<>();
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeerRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeerRequestImpl.java
index ba6d2c8..d18ee14 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeerRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeerRequestImpl.java
@@ -8,7 +8,7 @@ class AddPeerRequestImpl implements RaftClientMessages.AddPeerRequest, RaftClien
     private PeerId leaderId;
     private PeerId peerId;
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeerRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeerRequestImpl.java
index 47d3abc..4c611c7 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeerRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeerRequestImpl.java
@@ -9,7 +9,7 @@ class ChangePeerRequestImpl implements RaftClientMessages.ChangePeersRequest, Ra
     private String groupId;
     private List<PeerId> newPeersList = new ArrayList<>();
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java
index dbc7566..4162f81 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java
@@ -6,7 +6,7 @@ import org.apache.ignite.raft.client.RaftClientMessages;
 public class GetLeaderRequestImpl implements RaftClientMessages.GetLeaderRequest, RaftClientMessages.GetLeaderRequest.Builder {
     private String groupId;
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequestImpl.java
index ea50a57..d3a2d99 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequestImpl.java
@@ -6,7 +6,7 @@ class GetPeersRequestImpl implements RaftClientMessages.GetPeersRequest, RaftCli
     private String groupId;
     private boolean onlyAlive;
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/PingRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/PingRequestImpl.java
deleted file mode 100644
index b0c0a78..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/PingRequestImpl.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package org.apache.ignite.raft.client.message;
-
-import org.apache.ignite.raft.client.RaftClientMessages;
-
-class PingRequestImpl implements RaftClientMessages.PingRequest, RaftClientMessages.PingRequest.Builder {
-    private long sendTimestamp;
-
-    @Override public long getSendTimestamp() {
-        return sendTimestamp;
-    }
-
-    @Override public Builder setSendTimestamp(long timestamp) {
-        this.sendTimestamp = timestamp;
-
-        return this;
-    }
-
-    @Override public RaftClientMessages.PingRequest build() {
-        return this;
-    }
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ClientMessageBuilderFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactory.java
similarity index 89%
rename from modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ClientMessageBuilderFactory.java
rename to modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactory.java
index 4351a67..95a350d 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ClientMessageBuilderFactory.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactory.java
@@ -3,11 +3,7 @@ package org.apache.ignite.raft.client.message;
 import org.apache.ignite.raft.client.RaftClientMessages;
 
 /** */
-public interface ClientMessageBuilderFactory {
-    RaftClientMessages.PingRequest.Builder createPingRequest();
-
-    RaftClientMessages.StatusResponse.Builder createStatusResponse();
-
+public interface RaftClientMessageFactory {
     RaftClientMessages.AddPeerRequest.Builder createAddPeerRequest();
 
     RaftClientMessages.AddPeerResponse.Builder createAddPeerResponse();
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageBuilderFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactoryImpl.java
similarity index 85%
rename from modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageBuilderFactory.java
rename to modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactoryImpl.java
index 2d47315..6e170ae 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageBuilderFactory.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactoryImpl.java
@@ -3,18 +3,10 @@ package org.apache.ignite.raft.client.message;
 import org.apache.ignite.raft.client.RaftClientMessages;
 
 /**
- * Raft client message builders factory.
+ * Raft client message factory.
  */
-public class RaftClientMessageBuilderFactory implements ClientMessageBuilderFactory {
-    public static RaftClientMessageBuilderFactory INSTANCE = new RaftClientMessageBuilderFactory();
-
-    @Override public RaftClientMessages.PingRequest.Builder createPingRequest() {
-        return new PingRequestImpl();
-    }
-
-    @Override public RaftClientMessages.StatusResponse.Builder createStatusResponse() {
-        return new StatusResponseImpl();
-    }
+public class RaftClientMessageFactoryImpl implements RaftClientMessageFactory {
+    public static RaftClientMessageFactoryImpl INSTANCE = new RaftClientMessageFactoryImpl();
 
     @Override public RaftClientMessages.AddPeerRequest.Builder createAddPeerRequest() {
         return new AddPeerRequestImpl();
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequestImpl.java
index e55aefc..b319b69 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequestImpl.java
@@ -9,7 +9,7 @@ class RemoveLearnersRequestImpl implements RaftClientMessages.RemoveLearnersRequ
     private String groupId;
     private List<PeerId> learnersList = new ArrayList<>();
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeerRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeerRequestImpl.java
index 0e4934d..a6d57b1 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeerRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeerRequestImpl.java
@@ -7,7 +7,7 @@ class RemovePeerRequestImpl implements RaftClientMessages.RemovePeerRequest, Raf
     private String groupId;
     private PeerId peerId;
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetLearnersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetLearnersRequestImpl.java
index 340a39f..e1d144d 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetLearnersRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetLearnersRequestImpl.java
@@ -9,7 +9,7 @@ class ResetLearnersRequestImpl implements RaftClientMessages.ResetLearnersReques
     private String groupId;
     private List<PeerId> learnersList = new ArrayList<>();
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetPeerRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetPeerRequestImpl.java
index 6c8d4bc..4ead585 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetPeerRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetPeerRequestImpl.java
@@ -9,7 +9,7 @@ class ResetPeerRequestImpl implements RaftClientMessages.ResetPeerRequest, RaftC
     private String groupId;
     private List<PeerId> newPeersList = new ArrayList<>();
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequestImpl.java
index 08a09dc..2b2eea6 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequestImpl.java
@@ -5,7 +5,7 @@ import org.apache.ignite.raft.client.RaftClientMessages;
 class SnapshotRequestImpl implements RaftClientMessages.SnapshotRequest, RaftClientMessages.SnapshotRequest.Builder {
     private String groupId;
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/StatusResponseImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/StatusResponseImpl.java
deleted file mode 100644
index 6ae0b6b..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/StatusResponseImpl.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package org.apache.ignite.raft.client.message;
-
-import org.apache.ignite.raft.client.RaftClientMessages;
-
-class StatusResponseImpl implements RaftClientMessages.StatusResponse, RaftClientMessages.StatusResponse.Builder {
-    private int errorCode;
-    private String errorMsg = "";
-
-    @Override public int getStatusCode() {
-        return errorCode;
-    }
-
-    @Override public Builder setStatusCode(int errorCode) {
-        this.errorCode = errorCode;
-
-        return this;
-    }
-
-    @Override public String getStatusMsg() {
-        return errorMsg;
-    }
-
-    @Override public Builder setStatusMsg(String errorMsg) {
-        this.errorMsg = errorMsg;
-
-        return this;
-    }
-
-    @Override public RaftClientMessages.StatusResponse build() {
-        return this;
-    }
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeaderRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeaderRequestImpl.java
index 2cc6592..24957df 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeaderRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeaderRequestImpl.java
@@ -7,7 +7,7 @@ class TransferLeaderRequestImpl implements RaftClientMessages.TransferLeaderRequ
     private String groupId;
     private PeerId peerId;
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/UserRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/UserRequestImpl.java
index 06d667e..93cc463 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/UserRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/UserRequestImpl.java
@@ -2,11 +2,11 @@ package org.apache.ignite.raft.client.message;
 
 import org.apache.ignite.raft.client.RaftClientMessages;
 
-public class UserRequestImpl<T> implements RaftClientMessages.UserRequest<T>, RaftClientMessages.UserRequest.Builder<T> {
-    private T request;
+public class UserRequestImpl<T> implements RaftClientMessages.UserRequest, RaftClientMessages.UserRequest.Builder {
+    private Object request;
     private String groupId;
 
-    @Override public T request() {
+    @Override public Object request() {
         return request;
     }
 
@@ -16,13 +16,13 @@ public class UserRequestImpl<T> implements RaftClientMessages.UserRequest<T>, Ra
         return this;
     }
 
-    @Override public Builder setRequest(T request) {
+    @Override public Builder setRequest(Object request) {
         this.request = request;
 
         return this;
     }
 
-    @Override public RaftClientMessages.UserRequest<T> build() {
+    @Override public RaftClientMessages.UserRequest build() {
         return this;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
index 14384e9..382e953 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
@@ -17,11 +17,12 @@
 package org.apache.ignite.raft.client.rpc;
 
 import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
 import org.apache.ignite.raft.State;
 import org.apache.ignite.raft.PeerId;
-import org.apache.ignite.raft.client.message.ClientMessageBuilderFactory;
-import org.apache.ignite.raft.rpc.Message;
-import org.apache.ignite.raft.rpc.RaftGroupMessage;
+import org.apache.ignite.raft.client.RaftClientMessages.UserRequest;
+import org.apache.ignite.raft.client.RaftClientMessages.UserResponse;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
 
 import static org.apache.ignite.raft.client.RaftClientMessages.AddLearnersRequest;
 import static org.apache.ignite.raft.client.RaftClientMessages.AddPeerRequest;
@@ -35,30 +36,27 @@ import static org.apache.ignite.raft.client.RaftClientMessages.RemovePeerRespons
 import static org.apache.ignite.raft.client.RaftClientMessages.ResetLearnersRequest;
 import static org.apache.ignite.raft.client.RaftClientMessages.ResetPeerRequest;
 import static org.apache.ignite.raft.client.RaftClientMessages.SnapshotRequest;
-import static org.apache.ignite.raft.client.RaftClientMessages.StatusResponse;
 import static org.apache.ignite.raft.client.RaftClientMessages.TransferLeaderRequest;
 
 /**
- * Low-level raft group RPC client.
- * <p>
- * Additionally maintains raft group state.
+ * Replicating group RPC client.
  */
 public interface RaftGroupRpcClient {
     /**
      * @param groupId Group id.
      * @return Group state snapshot.
      */
-    State state(String groupId);
+    @NotNull State state(String groupId);
 
     /**
-     * Refreshes a group leader.
+     * Refreshes a replicating group leader.
      * @param groupId Group id.
      * @return A future.
      */
     CompletableFuture<PeerId> refreshLeader(String groupId);
 
     /**
-     * Refreshes group members (but without a leader).
+     * Refreshes a replicating group members (except a leader).
      * @param groupId Group id.
      * @return A future.
      */
@@ -88,7 +86,7 @@ public interface RaftGroupRpcClient {
      * @param request   request data
      * @return A future with result.
      */
-    CompletableFuture<StatusResponse> resetPeers(PeerId peerId, ResetPeerRequest request);
+    CompletableFuture<Void> resetPeers(PeerId peerId, ResetPeerRequest request);
 
     /**
      * Takes a local snapshot.
@@ -98,7 +96,7 @@ public interface RaftGroupRpcClient {
      * @param done      callback
      * @return a future with result
      */
-    CompletableFuture<StatusResponse> snapshot(PeerId peerId, SnapshotRequest request);
+    CompletableFuture<Void> snapshot(PeerId peerId, SnapshotRequest request);
 
     /**
      * Change peers.
@@ -148,20 +146,20 @@ public interface RaftGroupRpcClient {
      * @param done      callback
      * @return a future with result
      */
-    CompletableFuture<StatusResponse> transferLeader(TransferLeaderRequest request);
+    CompletableFuture<Void> transferLeader(TransferLeaderRequest request);
 
     /**
-     * Performs a custom action defined by specific request on the raft group leader.
+     * Performs a user action defined by specific request to the raft group leader.
      *
      * @param endpoint  server address
      * @param request   request data
      * @param done      callback
      * @return a future with result
      */
-    <R extends Message> CompletableFuture<R> sendCustom(RaftGroupMessage request);
+    <R> CompletableFuture<UserResponse<R>> sendUserRequest(UserRequest request);
 
     /**
      * @return A message builder factory.
      */
-    ClientMessageBuilderFactory factory();
+    RaftClientMessageFactory factory();
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
index 4599556..4f46ed9 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
@@ -6,53 +6,43 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
+import org.apache.ignite.network.NetworkCluster;
+import org.apache.ignite.network.NetworkMember;
 import org.apache.ignite.raft.PeerId;
-import org.apache.ignite.raft.RaftException;
 import org.apache.ignite.raft.State;
 import org.apache.ignite.raft.client.RaftClientMessages;
 import org.apache.ignite.raft.client.RaftClientMessages.GetLeaderResponse;
-import org.apache.ignite.raft.client.RaftClientMessages.StatusResponse;
-import org.apache.ignite.raft.client.message.ClientMessageBuilderFactory;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
 import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
-import org.apache.ignite.raft.rpc.InvokeCallback;
-import org.apache.ignite.raft.rpc.Message;
-import org.apache.ignite.raft.rpc.Node;
-import org.apache.ignite.raft.rpc.RaftGroupMessage;
-import org.apache.ignite.raft.rpc.RpcClient;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 
 public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
-    private final ExecutorService executor;
-    private final int defaultTimeout;
-    private final RpcClient rpcClient;
+    private final NetworkCluster cluster;
 
     /** Where to ask for initial configuration. */
-    private final Set<Node> initialCfgNodes;
+    private final Set<NetworkMember> initialCfgNodes;
+
+    /** */
+    private final RaftClientMessageFactory factory;
 
     /** */
-    private final ClientMessageBuilderFactory factory;
+    private final int defaultTimeout;
 
     private Map<String, StateImpl> states = new ConcurrentHashMap<>();
 
     /**
      * Accepts dependencies in constructor.
-     * @param rpcClient
-     * @param defaultTimeout
+     * @param cluster Cluster.
+     * @param defaultTimeout Default request timeout.
      * @param initialCfgNode Initial configuration nodes.
      */
-    public RaftGroupRpcClientImpl(RpcClient rpcClient, ClientMessageBuilderFactory factory, int defaultTimeout, Set<Node> initialCfgNodes) {
+    public RaftGroupRpcClientImpl(NetworkCluster cluster, RaftClientMessageFactory factory, int defaultTimeout, Set<NetworkMember> initialCfgNodes) {
         this.defaultTimeout = defaultTimeout;
-        this.rpcClient = rpcClient;
+        this.cluster = cluster;
         this.factory = factory;
         this.initialCfgNodes = new HashSet<>(initialCfgNodes);
-        executor = Executors.newWorkStealingPool();
     }
 
     @Override public State state(String groupId) {
@@ -60,27 +50,17 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
     }
 
     private StateImpl getState(String groupId) {
-        StateImpl newState = new StateImpl();
-
-        StateImpl state = states.putIfAbsent(groupId, newState);
-
-        if (state == null)
-            state = newState;
-
-        return state;
+        return states.computeIfAbsent(groupId, k -> new StateImpl());
     }
 
     @Override public CompletableFuture<PeerId> refreshLeader(String groupId) {
         StateImpl state = getState(groupId);
 
-        return refreshLeader(initialCfgNodes.iterator().next(), groupId).
-            thenApply(resp -> {
-                PeerId leaderId = resp.getLeaderId();
+        RaftClientMessages.GetLeaderRequest req = factory.createGetLeaderRequest().setGroupId(groupId).build();
 
-                state.leader = leaderId;
+        CompletableFuture<GetLeaderResponse> fut = cluster.sendWithResponse(initialCfgNodes.iterator().next(), req, defaultTimeout);
 
-                return leaderId;
-            });
+        return fut.thenApply(resp -> state.leader = resp.getLeaderId());
     }
 
     @Override public CompletableFuture<State> refreshMembers(String groupId) {
@@ -95,11 +75,11 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
         return null;
     }
 
-    @Override public CompletableFuture<StatusResponse> resetPeers(PeerId peerId, RaftClientMessages.ResetPeerRequest request) {
+    @Override public CompletableFuture<Void> resetPeers(PeerId peerId, RaftClientMessages.ResetPeerRequest request) {
         return null;
     }
 
-    @Override public CompletableFuture<StatusResponse> snapshot(PeerId peerId, RaftClientMessages.SnapshotRequest request) {
+    @Override public CompletableFuture<Void> snapshot(PeerId peerId, RaftClientMessages.SnapshotRequest request) {
         return null;
     }
 
@@ -119,82 +99,23 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
         return null;
     }
 
-    @Override public CompletableFuture<StatusResponse> transferLeader(RaftClientMessages.TransferLeaderRequest request) {
+    @Override public CompletableFuture<Void> transferLeader(RaftClientMessages.TransferLeaderRequest request) {
         return null;
     }
 
-    private CompletableFuture<GetLeaderResponse> refreshLeader(Node node, String groupId) {
-        StateImpl state = getState(groupId);
-
-        while(true) {
-            CompletableFuture<GetLeaderResponse> fut = state.updateFutRef.get();
-
-            if (fut != null)
-                return fut;
-
-            if (state.updateFutRef.compareAndSet(null, (fut = new CompletableFuture<>()))) {
-                RaftClientMessages.GetLeaderRequest req = factory.createGetLeaderRequest().setGroupId(groupId).build();
+    @Override public CompletableFuture<RaftClientMessages.UserResponse> sendUserRequest(RaftClientMessages.UserRequest request) {
+        if (request.getGroupId() == null)
+            throw new IllegalArgumentException("groupId is required");
 
-                CompletableFuture<GetLeaderResponse> finalFut = fut;
-
-                rpcClient.invokeAsync(node, req, new InvokeCallback<GetLeaderResponse>() {
-                    @Override public void complete(GetLeaderResponse response, Throwable err) {
-                        if (err != null)
-                            finalFut.completeExceptionally(err);
-                        else
-                            finalFut.complete(response);
-
-                        state.updateFutRef.set(null);
-                    }
-                }, executor, defaultTimeout);
-
-                return fut;
-            }
-        }
-    }
-
-    @Override public <R extends Message> CompletableFuture<R> sendCustom(RaftGroupMessage request) {
         State state = state(request.getGroupId());
 
-        CompletableFuture<R> fut = new CompletableFuture<>();
-
-        fut.orTimeout(defaultTimeout, TimeUnit.MILLISECONDS);
-
         CompletableFuture<PeerId> fut0 = state.leader() == null ?
             refreshLeader(request.getGroupId()) : completedFuture(state.leader());
 
-        fut0.whenComplete(new BiConsumer<PeerId, Throwable>() {
-            @Override public void accept(PeerId peerId, Throwable error) {
-                if (error == null) {
-                    rpcClient.invokeAsync(peerId.getNode(), request, new InvokeCallback<R>() {
-                        @Override public void complete(R response, Throwable err) {
-                            if (err != null)
-                                fut.completeExceptionally(err);
-                            else {
-                                if (response instanceof StatusResponse) {
-                                    StatusResponse resp = (StatusResponse) response;
-
-                                    // Translate error response to exception with the code.
-                                    if (resp.getStatusCode() != 0)
-                                        fut.completeExceptionally(new RaftException(resp.getStatusCode(), resp.getStatusMsg()));
-                                    else
-                                        fut.complete(response);
-                                } else
-                                    fut.complete(response);
-                            }
-                        }
-                    }, executor, defaultTimeout);
-                }
-                else {
-                    fut.completeExceptionally(error);
-                }
-            }
-        });
-
-        return fut;
-    }
-
-    @Override public ClientMessageBuilderFactory factory() {
+        return fut0.thenCompose(peerId -> cluster.sendWithResponse(peerId.getNode(), request, defaultTimeout));
+    }
+
+    @Override public RaftClientMessageFactory factory() {
         return this.factory;
     }
 
@@ -205,8 +126,6 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
 
         private volatile List<PeerId> learners;
 
-        private AtomicReference<CompletableFuture<GetLeaderResponse>> updateFutRef = new AtomicReference<>();
-
         @Override public @Nullable PeerId leader() {
             return leader;
         }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java
index 2c5e04a..a4a8a59 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java
@@ -13,5 +13,5 @@ public interface RaftGroupClientRequestService {
      * @param <R> Response.
      * @return A future.
      */
-    <T, R> CompletableFuture<R> submit(T request);
+    <R> CompletableFuture<R> submit(Object request);
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupManagmentService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupManagmentService.java
index d868dc9..2bd8fac 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupManagmentService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupManagmentService.java
@@ -57,7 +57,7 @@ public interface RaftGroupManagmentService {
      * @param peerId  Peer id.
      * @param request   request data
      * @param done      callback
-     * @return a future with result
+     * @return a future with result.
      */
     CompletableFuture<Void> snapshot(PeerId peerId);
 
@@ -77,7 +77,7 @@ public interface RaftGroupManagmentService {
      * @param endpoint  server address
      * @param request   request data
      * @param done      callback
-     * @return a future with result
+     * @return a future with result.
      */
     CompletableFuture<PeersChangeState> addLearners(List<PeerId> peers);
 
@@ -97,7 +97,7 @@ public interface RaftGroupManagmentService {
      * @param endpoint  server address
      * @param request   request data
      * @param done      callback
-     * @return a future with result
+     * @return a future with result.
      */
     CompletableFuture<PeersChangeState> resetLearners(List<PeerId> peers);
 
@@ -107,7 +107,7 @@ public interface RaftGroupManagmentService {
      * @param endpoint  server address
      * @param request   request data
      * @param done      callback
-     * @return a future with result
+     * @return a future with result.
      */
     CompletableFuture<Void> transferLeader(PeerId newLeader);
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java
index 16d63c4..c5b7c25 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java
@@ -1,11 +1,9 @@
 package org.apache.ignite.raft.client.service.impl;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
 import org.apache.ignite.raft.client.RaftClientMessages;
 import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
 import org.apache.ignite.raft.client.service.RaftGroupClientRequestService;
-import org.apache.ignite.raft.rpc.Message;
 
 public class RaftGroupClientRequestServiceImpl implements RaftGroupClientRequestService {
     private final RaftGroupRpcClient rpcClient;
@@ -16,16 +14,12 @@ public class RaftGroupClientRequestServiceImpl implements RaftGroupClientRequest
         this.groupId = groupId;
     }
 
-    @Override public <T, R> CompletableFuture<R> submit(T request) {
+    @Override public <R> CompletableFuture<R> submit(Object request) {
         RaftClientMessages.UserRequest r =
             rpcClient.factory().createUserRequest().setRequest(request).setGroupId(groupId).build();
 
-        return rpcClient.sendCustom(r).thenApply(new Function<Message, R>() {
-            @Override public R apply(Message message) {
-                RaftClientMessages.UserResponse<R> resp = (RaftClientMessages.UserResponse<R>) message;
+        CompletableFuture<RaftClientMessages.UserResponse<R>> completableFuture = rpcClient.sendUserRequest(r);
 
-                return resp.response();
-            }
-        });
+        return completableFuture.thenApply(resp -> resp.response());
     }
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java
index a70c1fb..28e28c8 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java
@@ -8,7 +8,7 @@ import org.apache.ignite.raft.client.service.RaftGroupManagmentService;
 import org.jetbrains.annotations.Nullable;
 
 public class RaftGroupManagementServiceImpl implements RaftGroupManagmentService {
-    private RaftGroupRpcClient rpcClient;
+    private final RaftGroupRpcClient rpcClient;
 
     public RaftGroupManagementServiceImpl(RaftGroupRpcClient rpcClient) {
         this.rpcClient = rpcClient;
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java
deleted file mode 100644
index 743d519..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.raft.rpc;
-
-/**
- *
- */
-public interface InvokeCallback<T extends Message> {
-    void complete(final T response, final Throwable err);
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Message.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Message.java
deleted file mode 100644
index 175d27b..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Message.java
+++ /dev/null
@@ -1,4 +0,0 @@
-package org.apache.ignite.raft.rpc;
-
-public interface Message {
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java
deleted file mode 100644
index 966ff52..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.ignite.raft.rpc;
-
-import java.io.Serializable;
-
-/**
- * TODO FIXME asch must be elsewhere.
- */
-public interface Node extends Serializable {
-    String id();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
deleted file mode 100644
index ea6f10c..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.raft.rpc;
-
-/**
- * An node with an immutable id.
- * TODO FIXME asch must be elsewhere.
- */
-public class NodeImpl implements Node {
-    private final String id;
-
-    public NodeImpl(String id) {
-        super();
-        this.id = id;
-    }
-
-    @Override public String id() {
-        return id;
-    }
-
-    @Override public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        NodeImpl node = (NodeImpl) o;
-
-        if (!id.equals(node.id)) return false;
-
-        return true;
-    }
-
-    @Override public int hashCode() {
-        return id.hashCode();
-    }
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java
deleted file mode 100644
index 071360c..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.ignite.raft.rpc;
-
-/**
- * A message targeted to a specific raft group.
- */
-public interface RaftGroupMessage extends Message {
-    public String getGroupId();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java
deleted file mode 100644
index 4231dae..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.raft.rpc;
-
-import java.util.concurrent.Executor;
-
-/**
- *
- */
-public interface RpcClient {
-    /**
-     * Check connection for given address.
-     *
-     * @param node target address
-     * @return true if there is a connection and the connection is active and writable.
-     */
-    boolean checkConnection(final Node node);
-
-    /**
-     * Check connection for given address and async to create a new one if there is no connection.
-     *
-     * @param node       target address
-     * @param createIfAbsent create a new one if there is no connection
-     * @return true if there is a connection and the connection is active and writable.
-     */
-    boolean checkConnection(final Node node, final boolean createIfAbsent);
-
-    /**
-     * Close all connections of a address.
-     *
-     * @param node target address
-     */
-    void closeConnection(final Node node);
-
-    /**
-     * Asynchronous invocation with a callback.
-     *
-     * @param node  Target node.
-     * @param request   Request object
-     * @param callback  Invoke callback.
-     * @param executor  Executor to run invoke callback.
-     * @param timeoutMs Timeout millisecond.
-     */
-    void invokeAsync(final Node node, final Message request, InvokeCallback callback, Executor executor, final long timeoutMs);
-}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/MockUtils.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/MockUtils.java
new file mode 100644
index 0000000..9edc931
--- /dev/null
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/MockUtils.java
@@ -0,0 +1,71 @@
+package org.apache.ignite.raft.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.network.NetworkCluster;
+import org.apache.ignite.network.NetworkMember;
+import org.apache.ignite.raft.PeerId;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.apache.ignite.raft.client.message.RaftClientMessageFactoryImpl.INSTANCE;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+
+public class MockUtils {
+    public static PeerId LEADER = new PeerId(new NetworkMember("test"));
+
+    public static class TestInput1 {
+    }
+
+    public static class TestOutput1 {
+    }
+
+    public static class TestInput2 {
+    }
+
+    public static class TestOutput2 {
+    }
+
+    public static void mockUserInput1(NetworkCluster cluster) {
+        Mockito.doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+                RaftClientMessages.UserResponse resp = INSTANCE.createUserResponse().setResponse(new TestOutput1()).build();
+
+                return CompletableFuture.completedFuture(resp);
+            }
+        }).when(cluster).sendWithResponse(eq(LEADER.getNode()), argThat(new ArgumentMatcher<RaftClientMessages.UserRequest>() {
+            @Override public boolean matches(RaftClientMessages.UserRequest arg) {
+                return arg.request() instanceof TestInput1;
+            }
+        }), anyLong());
+    }
+
+    public static void mockUserInput2(NetworkCluster cluster) {
+        Mockito.doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+                RaftClientMessages.UserResponse resp = INSTANCE.createUserResponse().setResponse(new TestOutput2()).build();
+
+                return CompletableFuture.completedFuture(resp);
+            }
+        }).when(cluster).sendWithResponse(eq(LEADER.getNode()), argThat(new ArgumentMatcher<RaftClientMessages.UserRequest>() {
+            @Override public boolean matches(RaftClientMessages.UserRequest arg) {
+                return arg.request() instanceof TestInput2;
+            }
+        }), anyLong());
+    }
+
+    public static void mockLeaderRequest(NetworkCluster cluster, boolean timeout) {
+        Mockito.doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+                RaftClientMessages.GetLeaderResponse resp = INSTANCE.createGetLeaderResponse().setLeaderId(LEADER).build();
+
+                return timeout ? CompletableFuture.failedFuture(new TimeoutException()) : CompletableFuture.completedFuture(resp);
+            }
+        }).when(cluster).sendWithResponse(eq(LEADER.getNode()), any(RaftClientMessages.GetLeaderRequest.class), anyLong());
+    }
+}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClientTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClientTest.java
index 5247c59..8ab61e3 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClientTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClientTest.java
@@ -2,62 +2,56 @@ package org.apache.ignite.raft.client.rpc;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
+import org.apache.ignite.network.NetworkCluster;
 import org.apache.ignite.raft.PeerId;
-import org.apache.ignite.raft.State;
-import org.apache.ignite.raft.client.RaftClientMessages.GetLeaderRequest;
+import org.apache.ignite.raft.client.MockUtils.TestInput1;
+import org.apache.ignite.raft.client.MockUtils.TestInput2;
+import org.apache.ignite.raft.client.MockUtils.TestOutput1;
+import org.apache.ignite.raft.client.MockUtils.TestOutput2;
+import org.apache.ignite.raft.client.RaftClientMessages;
 import org.apache.ignite.raft.client.rpc.impl.RaftGroupRpcClientImpl;
-import org.apache.ignite.raft.rpc.InvokeCallback;
-import org.apache.ignite.raft.rpc.Message;
-import org.apache.ignite.raft.rpc.NodeImpl;
-import org.apache.ignite.raft.rpc.RaftGroupMessage;
-import org.apache.ignite.raft.rpc.RpcClient;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.stubbing.Answer;
 
 import static java.util.Collections.singleton;
-import static org.apache.ignite.raft.client.message.RaftClientMessageBuilderFactory.INSTANCE;
+import static org.apache.ignite.raft.client.MockUtils.LEADER;
+import static org.apache.ignite.raft.client.MockUtils.mockLeaderRequest;
+import static org.apache.ignite.raft.client.MockUtils.mockUserInput1;
+import static org.apache.ignite.raft.client.MockUtils.mockUserInput2;
+import static org.apache.ignite.raft.client.message.RaftClientMessageFactoryImpl.INSTANCE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
 
 @ExtendWith(MockitoExtension.class)
 public class RaftGroupRpcClientTest {
     @Mock
-    private RpcClient rpcClient;
-
-    private static PeerId leader = new PeerId(new NodeImpl("test"));
+    private NetworkCluster cluster;
 
     @Test
     public void testRefreshLeader() throws Exception {
         String groupId = "test";
 
-        mockLeaderRequest(false);
+        mockLeaderRequest(cluster, false);
 
-        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, INSTANCE, 5_000, singleton(leader.getNode()));
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(cluster, INSTANCE, 5_000, singleton(LEADER.getNode()));
 
         PeerId leaderId = client.refreshLeader(groupId).get();
 
-        assertEquals(leader, client.state(groupId).leader());
-        assertEquals(leader, leaderId);
+        assertEquals(LEADER, client.state(groupId).leader());
+        assertEquals(LEADER, leaderId);
     }
 
     @Test
     public void testRefreshLeaderMultithreaded() throws Exception {
         String groupId = "test";
 
-        mockLeaderRequest(false);
+        mockLeaderRequest(cluster, false);
 
-        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, INSTANCE, 5_000, singleton(leader.getNode()));
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(cluster, INSTANCE, 5_000, singleton(LEADER.getNode()));
 
         int cnt = 20;
 
@@ -79,8 +73,8 @@ public class RaftGroupRpcClientTest {
                     try {
                         PeerId leaderId = client.refreshLeader(groupId).get();
 
-                        assertEquals(leader, client.state(groupId).leader());
-                        assertEquals(leader, leaderId);
+                        assertEquals(LEADER, client.state(groupId).leader());
+                        assertEquals(LEADER, leaderId);
                     }
                     catch (Exception e) {
                         fail(e);
@@ -99,10 +93,9 @@ public class RaftGroupRpcClientTest {
     public void testRefreshLeaderTimeout() throws Exception {
         String groupId = "test";
 
-        mockLeaderRequest(true);
+        mockLeaderRequest(cluster, true);
 
-        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, INSTANCE,
-            5_000, singleton(leader.getNode()));
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(cluster, INSTANCE, 5_000, singleton(LEADER.getNode()));
 
         try {
             client.refreshLeader(groupId).get();
@@ -118,66 +111,23 @@ public class RaftGroupRpcClientTest {
     public void testCustomMessage() throws Exception {
         String groupId = "test";
 
-        mockLeaderRequest(false);
-        mockCustomRequest();
-
-        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, INSTANCE,
-            5_000, singleton(leader.getNode()));
-
-        JunkRequest req = new JunkRequest(groupId);
-
-        Message resp = client.sendCustom(req).get();
-
-        State state = client.state(groupId);
-
-        // Expecting raft group state to be transparently loaded on first request.
-        assertEquals(leader, state.leader());
-
-        assertTrue(resp instanceof JunkResponse);
-    }
-
-    private static class JunkRequest implements RaftGroupMessage {
-        private final String groupId;
-
-        JunkRequest(String groupId) {
-            this.groupId = groupId;
-        }
+        mockLeaderRequest(cluster, false);
+        mockUserInput1(cluster);
+        mockUserInput2(cluster);
 
-        @Override public String getGroupId() {
-            return groupId;
-        }
-    }
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(cluster, INSTANCE, 5_000, singleton(LEADER.getNode()));
 
-    private static class JunkResponse implements Message {}
+        RaftClientMessages.UserRequest req1 =
+            client.factory().createUserRequest().setGroupId(groupId).setRequest(new TestInput1()).build();
 
-    private void mockCustomRequest() {
-        Mockito.doAnswer(new Answer() {
-            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
-                InvokeCallback callback = invocation.getArgument(2);
-                Executor executor = invocation.getArgument(3);
+        assertTrue(client.sendUserRequest(req1).get().response() instanceof TestOutput1);
 
-                executor.execute(() -> callback.complete(new JunkResponse(), null));
+        RaftClientMessages.UserRequest req2 =
+            client.factory().createUserRequest().setGroupId(groupId).setRequest(new TestInput2()).build();
 
-                return null;
-            }
-        }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(JunkRequest.class), any(), any(), anyLong());
-    }
+        assertTrue(client.sendUserRequest(req2).get().response() instanceof TestOutput2);
 
-    private void mockLeaderRequest(boolean timeout) {
-        Mockito.doAnswer(new Answer() {
-            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
-                InvokeCallback callback = invocation.getArgument(2);
-                Executor executor = invocation.getArgument(3);
-
-                executor.execute(() -> {
-                    if (timeout)
-                        callback.complete(null, new TimeoutException());
-                    else
-                        callback.complete(INSTANCE.createGetLeaderResponse().setLeaderId(leader).build(), null);
-                });
-
-                return null;
-            }
-        }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(GetLeaderRequest.class), any(), any(), anyLong());
+        // Expecting raft group state to be transparently loaded on first request.
+        assertEquals(LEADER, client.state(groupId).leader());
     }
 }
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestServiceTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestServiceTest.java
index 56e3ff8..1c802a0 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestServiceTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestServiceTest.java
@@ -1,51 +1,43 @@
 package org.apache.ignite.raft.client.service;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import org.apache.ignite.raft.PeerId;
-import org.apache.ignite.raft.client.RaftClientMessages;
-import org.apache.ignite.raft.client.RaftClientMessages.UserRequest;
+import org.apache.ignite.network.NetworkCluster;
+import org.apache.ignite.raft.client.MockUtils.TestInput1;
+import org.apache.ignite.raft.client.MockUtils.TestInput2;
+import org.apache.ignite.raft.client.MockUtils.TestOutput1;
+import org.apache.ignite.raft.client.MockUtils.TestOutput2;
 import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
 import org.apache.ignite.raft.client.rpc.impl.RaftGroupRpcClientImpl;
 import org.apache.ignite.raft.client.service.impl.RaftGroupClientRequestServiceImpl;
-import org.apache.ignite.raft.rpc.InvokeCallback;
-import org.apache.ignite.raft.rpc.NodeImpl;
-import org.apache.ignite.raft.rpc.RpcClient;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.stubbing.Answer;
 
 import static java.util.Collections.singleton;
-import static org.apache.ignite.raft.client.message.RaftClientMessageBuilderFactory.INSTANCE;
+import static org.apache.ignite.raft.client.MockUtils.LEADER;
+import static org.apache.ignite.raft.client.MockUtils.mockLeaderRequest;
+import static org.apache.ignite.raft.client.MockUtils.mockUserInput1;
+import static org.apache.ignite.raft.client.MockUtils.mockUserInput2;
+import static org.apache.ignite.raft.client.message.RaftClientMessageFactoryImpl.INSTANCE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.ArgumentMatchers.eq;
 
 @ExtendWith(MockitoExtension.class)
 public class RaftGroupClientRequestServiceTest {
     @Mock
-    private RpcClient rpcClient;
-
-    private static PeerId leader = new PeerId(new NodeImpl("test"));
+    private NetworkCluster cluster;
 
     @Test
     public void testUserRequest() throws Exception {
         String groupId = "test";
 
-        mockLeaderRequest();
-        mockUserRequest1();
-        mockUserRequest2();
+        mockLeaderRequest(cluster, false);
+        mockUserInput1(cluster);
+        mockUserInput2(cluster);
 
-        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, INSTANCE, 5_000, singleton(leader.getNode()));
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(cluster, INSTANCE, 5_000, singleton(LEADER.getNode()));
 
         RaftGroupClientRequestService service = new RaftGroupClientRequestServiceImpl(client, groupId);
 
@@ -63,70 +55,6 @@ public class RaftGroupClientRequestServiceTest {
 
         assertNotNull(output2);
 
-        assertEquals(leader, client.state(groupId).leader());
-    }
-
-    private static class TestInput1 {
-    }
-
-    private static class TestOutput1 {
-    }
-
-    private static class TestInput2 {
-    }
-
-    private static class TestOutput2 {
-    }
-
-    private void mockLeaderRequest() {
-        Mockito.doAnswer(new Answer() {
-            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
-                InvokeCallback callback = invocation.getArgument(2);
-                Executor executor = invocation.getArgument(3);
-
-                executor.execute(() -> {
-                    callback.complete(INSTANCE.createGetLeaderResponse().setLeaderId(leader).build(), null);
-                });
-
-                return null;
-            }
-        }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(RaftClientMessages.GetLeaderRequest.class),
-            any(), any(), anyLong());
-    }
-
-    private void mockUserRequest1() {
-        Mockito.doAnswer(new Answer() {
-            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
-                InvokeCallback callback = invocation.getArgument(2);
-                Executor executor = invocation.getArgument(3);
-
-                executor.execute(() -> callback.complete(INSTANCE.createUserResponse().
-                    setResponse(new TestOutput1()).build(), null));
-
-                return null;
-            }
-        }).when(rpcClient).invokeAsync(eq(leader.getNode()), argThat(new ArgumentMatcher<UserRequest>() {
-            @Override public boolean matches(UserRequest argument) {
-                return argument.request() instanceof TestInput1;
-            }
-        }), any(), any(), anyLong());
-    }
-
-    private void mockUserRequest2() {
-        Mockito.doAnswer(new Answer() {
-            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
-                InvokeCallback callback = invocation.getArgument(2);
-                Executor executor = invocation.getArgument(3);
-
-                executor.execute(() -> callback.complete(INSTANCE.createUserResponse().
-                    setResponse(new TestOutput2()).build(), null));
-
-                return null;
-            }
-        }).when(rpcClient).invokeAsync(eq(leader.getNode()), argThat(new ArgumentMatcher<UserRequest>() {
-            @Override public boolean matches(UserRequest argument) {
-                return argument.request() instanceof TestInput2;
-            }
-        }), any(), any(), anyLong());
+        assertEquals(LEADER, client.state(groupId).leader());
     }
 }