You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/03/01 17:06:07 UTC

[ignite-3] branch ignite-14149 updated (c4871ce -> 399f640)

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a change to branch ignite-14149
in repository https://gitbox.apache.org/repos/asf/ignite-3.git.


    from c4871ce  IGNITE-14149 wip.
     add db91bfb  IGNITE-14181 Support arrays of primitives in the configuration schema (#54)
     add ca0d35e  IGNITE-14194 Multiple storages support for configuration framework. (#55)
     add 552d887  IGNITE-14110 Networking module basic API and implementation - #53
     add c548a82  IGNITE-14110 Code style and maven configuration fixes
     new b22ccf8  Merge branch 'main' of https://gitbox.apache.org/repos/asf/ignite-3 into ignite-14149
     new 399f640  IGNITE-14149 Migrate to ignite network.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../processor/internal/ITProcessorTest.java        |  12 +-
 .../configuration/processor/internal/Types.java    |   3 +
 .../internal/TestConfigurationSchema.java          |   3 +
 .../processor/internal/Processor.java              | 138 +++++++---
 .../internal/util/ConfigurationUtilTest.java       |  32 ++-
 .../sample/ConfigurationArrayTest.java             | 105 ++++++++
 .../sample/storage/ConfigurationChangerTest.java   |  69 ++---
 .../sample/storage/TestConfigurationStorage.java   |   6 +-
 .../ignite/configuration/ConfigurationChanger.java | 277 ++++++++++++---------
 .../configuration/ConfigurationRegistry.java       |  20 ++
 .../org/apache/ignite/configuration/RootKey.java   |  13 +-
 .../apache/ignite/configuration/RootKeyImpl.java   |  56 +++++
 .../internal/util/ConfigurationUtil.java           | 130 +++++++++-
 .../storage/ConfigurationStorage.java              |   2 +-
 .../apache/ignite/configuration/storage/Data.java  |   6 +-
 .../ignite/configuration/tree/InnerNode.java       |   2 +-
 modules/{raft-client => network}/pom.xml           |  42 +++-
 .../ITScaleCubeNetworkClusterMessagingTest.java    |  97 ++++++++
 .../ignite/network/scalecube/TestMessage.java}     |  52 ++--
 .../scalecube/TestNetworkHandlersProvider.java     |  63 +++++
 .../ignite/network/MessageHandlerHolder.java       |  59 +++++
 .../org/apache/ignite/network/NetworkCluster.java  |  83 ++++++
 .../network/NetworkClusterEventHandler.java}       |  31 +--
 .../ignite/network/NetworkClusterFactory.java      |  75 ++++++
 .../ignite/network/NetworkHandlersProvider.java}   |  26 +-
 .../org/apache/ignite/network/NetworkMember.java}  |  48 ++--
 .../org/apache/ignite/network/NetworkMessage.java  |  59 +++++
 .../ignite/network/NetworkMessageHandler.java}     |  12 +-
 .../network/scalecube/ScaleCubeMemberResolver.java |  64 +++++
 .../network/scalecube/ScaleCubeMessageHandler.java | 100 ++++++++
 .../network/scalecube/ScaleCubeNetworkCluster.java | 113 +++++++++
 modules/raft-client/pom.xml                        |   5 +
 .../org/apache/ignite/raft/ElectionPriority.java   |   2 +-
 .../main/java/org/apache/ignite/raft/PeerId.java   |  18 +-
 .../java/org/apache/ignite/raft/RaftException.java |  16 --
 .../ignite/raft/client/RaftClientMessages.java     |  66 ++---
 .../client/message/AddLearnersRequestImpl.java     |   2 +-
 .../raft/client/message/AddPeerRequestImpl.java    |   2 +-
 .../raft/client/message/ChangePeerRequestImpl.java |   2 +-
 .../raft/client/message/GetLeaderRequestImpl.java  |   2 +-
 .../raft/client/message/GetPeersRequestImpl.java   |   2 +-
 .../raft/client/message/PingRequestImpl.java       |  21 --
 ...rFactory.java => RaftClientMessageFactory.java} |   6 +-
 ...tory.java => RaftClientMessageFactoryImpl.java} |  14 +-
 .../client/message/RemoveLearnersRequestImpl.java  |   2 +-
 .../raft/client/message/RemovePeerRequestImpl.java |   2 +-
 .../client/message/ResetLearnersRequestImpl.java   |   2 +-
 .../raft/client/message/ResetPeerRequestImpl.java  |   2 +-
 .../raft/client/message/SnapshotRequestImpl.java   |   2 +-
 .../raft/client/message/StatusResponseImpl.java    |  32 ---
 .../client/message/TransferLeaderRequestImpl.java  |   2 +-
 .../raft/client/message/UserRequestImpl.java       |  10 +-
 .../ignite/raft/client/rpc/RaftGroupRpcClient.java |  30 ++-
 .../client/rpc/impl/RaftGroupRpcClientImpl.java    | 135 ++--------
 .../service/RaftGroupClientRequestService.java     |   2 +-
 .../client/service/RaftGroupManagmentService.java  |   8 +-
 .../impl/RaftGroupClientRequestServiceImpl.java    |  12 +-
 .../impl/RaftGroupManagementServiceImpl.java       |   2 +-
 .../org/apache/ignite/raft/rpc/InvokeCallback.java |  24 --
 .../java/org/apache/ignite/raft/rpc/Message.java   |   4 -
 .../main/java/org/apache/ignite/raft/rpc/Node.java |  10 -
 .../apache/ignite/raft/rpc/RaftGroupMessage.java   |   8 -
 .../java/org/apache/ignite/raft/rpc/RpcClient.java |  59 -----
 .../org/apache/ignite/raft/client/MockUtils.java   |  71 ++++++
 .../raft/client/rpc/RaftGroupRpcClientTest.java    | 118 +++------
 .../service/RaftGroupClientRequestServiceTest.java | 104 ++------
 parent/pom.xml                                     |   7 +
 pom.xml                                            |   1 +
 68 files changed, 1742 insertions(+), 863 deletions(-)
 create mode 100644 modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/ConfigurationArrayTest.java
 create mode 100644 modules/configuration/src/main/java/org/apache/ignite/configuration/RootKeyImpl.java
 copy modules/{raft-client => network}/pom.xml (69%)
 create mode 100644 modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java
 rename modules/{raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java => network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessage.java} (53%)
 create mode 100644 modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java
 create mode 100644 modules/network/src/main/java/org/apache/ignite/network/MessageHandlerHolder.java
 create mode 100644 modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
 copy modules/{cli/src/main/java/org/apache/ignite/cli/builtins/module/ResolveResult.java => network/src/main/java/org/apache/ignite/network/NetworkClusterEventHandler.java} (57%)
 create mode 100644 modules/network/src/main/java/org/apache/ignite/network/NetworkClusterFactory.java
 copy modules/{configuration/src/main/java/org/apache/ignite/configuration/tree/NamedListView.java => network/src/main/java/org/apache/ignite/network/NetworkHandlersProvider.java} (64%)
 copy modules/{configuration/src/main/java/org/apache/ignite/configuration/internal/validation/MemberKey.java => network/src/main/java/org/apache/ignite/network/NetworkMember.java} (53%)
 create mode 100644 modules/network/src/main/java/org/apache/ignite/network/NetworkMessage.java
 copy modules/{configuration/src/main/java/org/apache/ignite/configuration/ConfigurationValue.java => network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java} (77%)
 create mode 100644 modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMemberResolver.java
 create mode 100644 modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java
 create mode 100644 modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
 delete mode 100644 modules/raft-client/src/main/java/org/apache/ignite/raft/RaftException.java
 delete mode 100644 modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/PingRequestImpl.java
 rename modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/{ClientMessageBuilderFactory.java => RaftClientMessageFactory.java} (89%)
 rename modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/{RaftClientMessageBuilderFactory.java => RaftClientMessageFactoryImpl.java} (85%)
 delete mode 100644 modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/StatusResponseImpl.java
 delete mode 100644 modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java
 delete mode 100644 modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Message.java
 delete mode 100644 modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java
 delete mode 100644 modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java
 delete mode 100644 modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java
 create mode 100644 modules/raft-client/src/test/java/org/apache/ignite/raft/client/MockUtils.java


[ignite-3] 02/02: IGNITE-14149 Migrate to ignite network.

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch ignite-14149
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 399f640174b26a38e336024c212debf42b80ef01
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Mon Mar 1 20:05:49 2021 +0300

    IGNITE-14149 Migrate to ignite network.
---
 .../org/apache/ignite/network/NetworkCluster.java  |  15 ++-
 .../network/scalecube/ScaleCubeNetworkCluster.java |  19 +--
 modules/raft-client/pom.xml                        |   5 +
 .../org/apache/ignite/raft/ElectionPriority.java   |   2 +-
 .../main/java/org/apache/ignite/raft/PeerId.java   |  18 +--
 .../java/org/apache/ignite/raft/RaftException.java |  16 ---
 .../ignite/raft/client/RaftClientMessages.java     |  66 ++++------
 .../client/message/AddLearnersRequestImpl.java     |   2 +-
 .../raft/client/message/AddPeerRequestImpl.java    |   2 +-
 .../raft/client/message/ChangePeerRequestImpl.java |   2 +-
 .../raft/client/message/GetLeaderRequestImpl.java  |   2 +-
 .../raft/client/message/GetPeersRequestImpl.java   |   2 +-
 .../raft/client/message/PingRequestImpl.java       |  21 ----
 ...rFactory.java => RaftClientMessageFactory.java} |   6 +-
 ...tory.java => RaftClientMessageFactoryImpl.java} |  14 +--
 .../client/message/RemoveLearnersRequestImpl.java  |   2 +-
 .../raft/client/message/RemovePeerRequestImpl.java |   2 +-
 .../client/message/ResetLearnersRequestImpl.java   |   2 +-
 .../raft/client/message/ResetPeerRequestImpl.java  |   2 +-
 .../raft/client/message/SnapshotRequestImpl.java   |   2 +-
 .../raft/client/message/StatusResponseImpl.java    |  32 -----
 .../client/message/TransferLeaderRequestImpl.java  |   2 +-
 .../raft/client/message/UserRequestImpl.java       |  10 +-
 .../ignite/raft/client/rpc/RaftGroupRpcClient.java |  30 +++--
 .../client/rpc/impl/RaftGroupRpcClientImpl.java    | 135 +++++----------------
 .../service/RaftGroupClientRequestService.java     |   2 +-
 .../client/service/RaftGroupManagmentService.java  |   8 +-
 .../impl/RaftGroupClientRequestServiceImpl.java    |  12 +-
 .../impl/RaftGroupManagementServiceImpl.java       |   2 +-
 .../org/apache/ignite/raft/rpc/InvokeCallback.java |  24 ----
 .../java/org/apache/ignite/raft/rpc/Message.java   |   4 -
 .../main/java/org/apache/ignite/raft/rpc/Node.java |  10 --
 .../java/org/apache/ignite/raft/rpc/NodeImpl.java  |  49 --------
 .../apache/ignite/raft/rpc/RaftGroupMessage.java   |   8 --
 .../java/org/apache/ignite/raft/rpc/RpcClient.java |  59 ---------
 .../org/apache/ignite/raft/client/MockUtils.java   |  71 +++++++++++
 .../raft/client/rpc/RaftGroupRpcClientTest.java    | 118 ++++++------------
 .../service/RaftGroupClientRequestServiceTest.java | 104 +++-------------
 38 files changed, 257 insertions(+), 625 deletions(-)

diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
index 5bdd576..f271b53 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
@@ -17,6 +17,7 @@
 package org.apache.ignite.network;
 
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 
 /**
@@ -59,7 +60,19 @@ public interface NetworkCluster {
      * @param member Network member which should receive the message.
      * @param msg Message which should be delivered.
      */
-    Future<?> guaranteedSend(NetworkMember member, Object msg);
+    Future<?> send(NetworkMember member, Object msg);
+
+    /**
+     * Sends asynchronously a message with same guarantees as for {@link #send(NetworkMember, Object)} and
+     * returns a response (RPC style).
+     *
+     * @param member Network member which should receive the message.
+     * @param msg A message.
+     * @param timeout Waiting for response timeout in milliseconds.
+     * @param <R> Expected response type.
+     * @return A future holding the response or error if the expected response was not received.
+     */
+    <R> CompletableFuture<R> sendWithResponse(NetworkMember member, Object msg, long timeout);
 
     /**
      * Add provider which allows to get configured handlers for different cluster events(ex. received message).
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
index 0dc1087..f2ebbac 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
@@ -16,9 +16,12 @@
  */
 package org.apache.ignite.network.scalecube;
 
+import io.scalecube.cluster.transport.api.Message;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import io.scalecube.cluster.Cluster;
 import org.apache.ignite.network.NetworkCluster;
@@ -29,6 +32,7 @@ import org.apache.ignite.network.NetworkMessageHandler;
 import org.apache.ignite.network.MessageHandlerHolder;
 
 import static io.scalecube.cluster.transport.api.Message.fromData;
+import static java.time.Duration.ofMillis;
 
 /**
  * Implementation of {@link NetworkCluster} based on ScaleCube.
@@ -84,15 +88,14 @@ public class ScaleCubeNetworkCluster implements NetworkCluster {
     }
 
     /** {@inheritDoc} */
-    @Override public Future<?> guaranteedSend(NetworkMember member, Object msg) {
-        cluster.send(memberResolver.resolveMember(member), fromData(msg))
-            .block();
-
-        CompletableFuture<Object> future = new CompletableFuture<>();
-
-        future.complete(null);
+    @Override public Future<?> send(NetworkMember member, Object msg) {
+        return cluster.send(memberResolver.resolveMember(member), fromData(msg)).toFuture();
+    }
 
-        return future;
+    /** {@inheritDoc} */
+    @Override public <R> CompletableFuture<R> sendWithResponse(NetworkMember member, Object msg, long timeout) {
+        return cluster.requestResponse(memberResolver.resolveMember(member), fromData(msg))
+            .timeout(ofMillis(timeout)).toFuture().thenApply(m -> m.data());
     }
 
     /** {@inheritDoc} */
diff --git a/modules/raft-client/pom.xml b/modules/raft-client/pom.xml
index 6242962..06be309 100644
--- a/modules/raft-client/pom.xml
+++ b/modules/raft-client/pom.xml
@@ -36,6 +36,11 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-network</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-engine</artifactId>
             <scope>test</scope>
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java
index 376a589..8dd5ddf 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java
@@ -21,7 +21,7 @@ package org.apache.ignite.raft;
  */
 public class ElectionPriority {
     /**
-     * Priority -1 represents this node disabled the priority election function.
+     * Priority -1 means this node has disabled the priority election function.
      */
     public static final int DISABLED = -1;
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
index 7b5fec0..9587c87 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
@@ -17,21 +17,21 @@
 package org.apache.ignite.raft;
 
 import java.io.Serializable;
-import org.apache.ignite.raft.rpc.Node;
+import org.apache.ignite.network.NetworkMember;
 
 /**
  * Represents a participant in a replicating group.
  */
-public class PeerId implements Serializable {
+public final class PeerId implements Serializable {
     private static final long serialVersionUID = 8083529734784884641L;
 
     /**
-     * Owning node.
+     * Cluster node for peer.
      */
-    private final Node node;
+    private final NetworkMember node;
 
     /**
-     * Node's local priority value, if node don't support priority election, this value is -1.
+     * Peer's local priority value, if node don't support priority election, this value is -1.
      */
     private final int priority;
 
@@ -40,17 +40,17 @@ public class PeerId implements Serializable {
         this.priority = peer.getPriority();
     }
 
-    public PeerId(Node node) {
+    public PeerId(NetworkMember node) {
         this(node, ElectionPriority.DISABLED);
     }
 
-    public PeerId(final Node node, final int priority) {
+    public PeerId(final NetworkMember node, final int priority) {
         super();
         this.node = node;
         this.priority = priority;
     }
 
-    public Node getNode() {
+    public NetworkMember getNode() {
         return this.node;
     }
 
@@ -77,6 +77,6 @@ public class PeerId implements Serializable {
     }
 
     @Override public String toString() {
-        return node.id() + ":" + priority;
+        return node.name() + ":" + priority;
     }
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/RaftException.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/RaftException.java
deleted file mode 100644
index f63f6f0..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/RaftException.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.ignite.raft;
-
-/** */
-public class RaftException extends Exception {
-    private final int statusCode;
-
-    public RaftException(int statusCode, String statusMsg) {
-        super(statusMsg);
-
-        this.statusCode = statusCode;
-    }
-
-    public int getStatusCode() {
-        return statusCode;
-    }
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientMessages.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientMessages.java
index 3f44685..c809a0d 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientMessages.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientMessages.java
@@ -21,8 +21,6 @@ package org.apache.ignite.raft.client;
 
 import java.util.List;
 import org.apache.ignite.raft.PeerId;
-import org.apache.ignite.raft.rpc.Message;
-import org.apache.ignite.raft.rpc.RaftGroupMessage;
 
 /**
  *
@@ -31,21 +29,7 @@ public final class RaftClientMessages {
     private RaftClientMessages() {
     }
 
-    public interface StatusResponse extends Message {
-        int getStatusCode();
-
-        String getStatusMsg();
-
-        interface Builder {
-            Builder setStatusCode(int code);
-
-            Builder setStatusMsg(String msg);
-
-            StatusResponse build();
-        }
-    }
-
-    public interface PingRequest extends Message {
+    public interface PingRequest {
         long getSendTimestamp();
 
         interface Builder {
@@ -55,7 +39,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface AddPeerRequest extends RaftGroupMessage {
+    public interface AddPeerRequest {
         PeerId getPeerId();
 
         interface Builder {
@@ -67,7 +51,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface AddPeerResponse extends Message {
+    public interface AddPeerResponse {
         List<PeerId> getOldPeersList();
 
         List<PeerId> getNewPeersList();
@@ -81,7 +65,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface RemovePeerRequest extends RaftGroupMessage {
+    public interface RemovePeerRequest {
         PeerId getPeerId();
 
         interface Builder {
@@ -93,7 +77,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface RemovePeerResponse extends Message {
+    public interface RemovePeerResponse {
         List<PeerId> getOldPeersList();
 
         List<PeerId> getNewPeersList();
@@ -107,7 +91,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface ChangePeersRequest extends RaftGroupMessage {
+    public interface ChangePeersRequest {
         List<PeerId> getNewPeersList();
 
         public interface Builder {
@@ -119,7 +103,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface ChangePeersResponse extends Message {
+    public interface ChangePeersResponse {
         List<PeerId> getOldPeersList();
 
         List<PeerId> getNewPeersList();
@@ -133,7 +117,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface SnapshotRequest extends RaftGroupMessage {
+    public interface SnapshotRequest {
         public interface Builder {
             Builder setGroupId(String groupId);
 
@@ -141,7 +125,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface ResetPeerRequest extends RaftGroupMessage {
+    public interface ResetPeerRequest {
         List<PeerId> getNewPeersList();
 
         public interface Builder {
@@ -153,7 +137,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface TransferLeaderRequest extends RaftGroupMessage {
+    public interface TransferLeaderRequest {
         PeerId getPeerId();
 
         public interface Builder {
@@ -163,7 +147,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface GetLeaderRequest extends RaftGroupMessage {
+    public interface GetLeaderRequest {
         public interface Builder {
             Builder setGroupId(String groupId);
 
@@ -171,7 +155,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface GetLeaderResponse extends Message {
+    public interface GetLeaderResponse {
         PeerId getLeaderId();
 
         public interface Builder {
@@ -181,7 +165,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface GetPeersRequest extends RaftGroupMessage {
+    public interface GetPeersRequest {
         boolean getOnlyAlive();
 
         public interface Builder {
@@ -193,7 +177,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface GetPeersResponse extends Message {
+    public interface GetPeersResponse {
         List<PeerId> getPeersList();
 
         List<PeerId> getLearnersList();
@@ -207,7 +191,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface AddLearnersRequest extends RaftGroupMessage {
+    public interface AddLearnersRequest {
         List<PeerId> getLearnersList();
 
         public interface Builder {
@@ -219,7 +203,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface RemoveLearnersRequest extends RaftGroupMessage {
+    public interface RemoveLearnersRequest {
         List<PeerId> getLearnersList();
 
         public interface Builder {
@@ -231,7 +215,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface ResetLearnersRequest extends RaftGroupMessage {
+    public interface ResetLearnersRequest {
         List<PeerId> getLearnersList();
 
         public interface Builder {
@@ -243,7 +227,7 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface LearnersOpResponse extends Message {
+    public interface LearnersOpResponse  {
         List<PeerId> getOldLearnersList();
 
         List<PeerId> getNewLearnersList();
@@ -257,19 +241,21 @@ public final class RaftClientMessages {
         }
     }
 
-    public interface UserRequest<T> extends RaftGroupMessage {
-        T request();
+    public interface UserRequest {
+        Object request();
 
-        public interface Builder<T> {
-            Builder setRequest(T request);
+        String getGroupId();
+
+        public interface Builder {
+            Builder setRequest(Object request);
 
             Builder setGroupId(String groupId);
 
-            UserRequest<T> build();
+            UserRequest build();
         }
     }
 
-    public interface UserResponse<T> extends Message {
+    public interface UserResponse<T> {
         T response();
 
         public interface Builder<T> {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequestImpl.java
index 2bb1125..dec9076 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequestImpl.java
@@ -10,7 +10,7 @@ public class AddLearnersRequestImpl implements RaftClientMessages.AddLearnersReq
     private PeerId leaderId;
     private List<PeerId> learnersList = new ArrayList<>();
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeerRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeerRequestImpl.java
index ba6d2c8..d18ee14 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeerRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeerRequestImpl.java
@@ -8,7 +8,7 @@ class AddPeerRequestImpl implements RaftClientMessages.AddPeerRequest, RaftClien
     private PeerId leaderId;
     private PeerId peerId;
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeerRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeerRequestImpl.java
index 47d3abc..4c611c7 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeerRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeerRequestImpl.java
@@ -9,7 +9,7 @@ class ChangePeerRequestImpl implements RaftClientMessages.ChangePeersRequest, Ra
     private String groupId;
     private List<PeerId> newPeersList = new ArrayList<>();
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java
index dbc7566..4162f81 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java
@@ -6,7 +6,7 @@ import org.apache.ignite.raft.client.RaftClientMessages;
 public class GetLeaderRequestImpl implements RaftClientMessages.GetLeaderRequest, RaftClientMessages.GetLeaderRequest.Builder {
     private String groupId;
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequestImpl.java
index ea50a57..d3a2d99 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequestImpl.java
@@ -6,7 +6,7 @@ class GetPeersRequestImpl implements RaftClientMessages.GetPeersRequest, RaftCli
     private String groupId;
     private boolean onlyAlive;
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/PingRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/PingRequestImpl.java
deleted file mode 100644
index b0c0a78..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/PingRequestImpl.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package org.apache.ignite.raft.client.message;
-
-import org.apache.ignite.raft.client.RaftClientMessages;
-
-class PingRequestImpl implements RaftClientMessages.PingRequest, RaftClientMessages.PingRequest.Builder {
-    private long sendTimestamp;
-
-    @Override public long getSendTimestamp() {
-        return sendTimestamp;
-    }
-
-    @Override public Builder setSendTimestamp(long timestamp) {
-        this.sendTimestamp = timestamp;
-
-        return this;
-    }
-
-    @Override public RaftClientMessages.PingRequest build() {
-        return this;
-    }
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ClientMessageBuilderFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactory.java
similarity index 89%
rename from modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ClientMessageBuilderFactory.java
rename to modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactory.java
index 4351a67..95a350d 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ClientMessageBuilderFactory.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactory.java
@@ -3,11 +3,7 @@ package org.apache.ignite.raft.client.message;
 import org.apache.ignite.raft.client.RaftClientMessages;
 
 /** */
-public interface ClientMessageBuilderFactory {
-    RaftClientMessages.PingRequest.Builder createPingRequest();
-
-    RaftClientMessages.StatusResponse.Builder createStatusResponse();
-
+public interface RaftClientMessageFactory {
     RaftClientMessages.AddPeerRequest.Builder createAddPeerRequest();
 
     RaftClientMessages.AddPeerResponse.Builder createAddPeerResponse();
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageBuilderFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactoryImpl.java
similarity index 85%
rename from modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageBuilderFactory.java
rename to modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactoryImpl.java
index 2d47315..6e170ae 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageBuilderFactory.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactoryImpl.java
@@ -3,18 +3,10 @@ package org.apache.ignite.raft.client.message;
 import org.apache.ignite.raft.client.RaftClientMessages;
 
 /**
- * Raft client message builders factory.
+ * Raft client message factory.
  */
-public class RaftClientMessageBuilderFactory implements ClientMessageBuilderFactory {
-    public static RaftClientMessageBuilderFactory INSTANCE = new RaftClientMessageBuilderFactory();
-
-    @Override public RaftClientMessages.PingRequest.Builder createPingRequest() {
-        return new PingRequestImpl();
-    }
-
-    @Override public RaftClientMessages.StatusResponse.Builder createStatusResponse() {
-        return new StatusResponseImpl();
-    }
+public class RaftClientMessageFactoryImpl implements RaftClientMessageFactory {
+    public static RaftClientMessageFactoryImpl INSTANCE = new RaftClientMessageFactoryImpl();
 
     @Override public RaftClientMessages.AddPeerRequest.Builder createAddPeerRequest() {
         return new AddPeerRequestImpl();
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequestImpl.java
index e55aefc..b319b69 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequestImpl.java
@@ -9,7 +9,7 @@ class RemoveLearnersRequestImpl implements RaftClientMessages.RemoveLearnersRequ
     private String groupId;
     private List<PeerId> learnersList = new ArrayList<>();
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeerRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeerRequestImpl.java
index 0e4934d..a6d57b1 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeerRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeerRequestImpl.java
@@ -7,7 +7,7 @@ class RemovePeerRequestImpl implements RaftClientMessages.RemovePeerRequest, Raf
     private String groupId;
     private PeerId peerId;
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetLearnersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetLearnersRequestImpl.java
index 340a39f..e1d144d 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetLearnersRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetLearnersRequestImpl.java
@@ -9,7 +9,7 @@ class ResetLearnersRequestImpl implements RaftClientMessages.ResetLearnersReques
     private String groupId;
     private List<PeerId> learnersList = new ArrayList<>();
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetPeerRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetPeerRequestImpl.java
index 6c8d4bc..4ead585 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetPeerRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetPeerRequestImpl.java
@@ -9,7 +9,7 @@ class ResetPeerRequestImpl implements RaftClientMessages.ResetPeerRequest, RaftC
     private String groupId;
     private List<PeerId> newPeersList = new ArrayList<>();
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequestImpl.java
index 08a09dc..2b2eea6 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequestImpl.java
@@ -5,7 +5,7 @@ import org.apache.ignite.raft.client.RaftClientMessages;
 class SnapshotRequestImpl implements RaftClientMessages.SnapshotRequest, RaftClientMessages.SnapshotRequest.Builder {
     private String groupId;
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/StatusResponseImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/StatusResponseImpl.java
deleted file mode 100644
index 6ae0b6b..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/StatusResponseImpl.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package org.apache.ignite.raft.client.message;
-
-import org.apache.ignite.raft.client.RaftClientMessages;
-
-class StatusResponseImpl implements RaftClientMessages.StatusResponse, RaftClientMessages.StatusResponse.Builder {
-    private int errorCode;
-    private String errorMsg = "";
-
-    @Override public int getStatusCode() {
-        return errorCode;
-    }
-
-    @Override public Builder setStatusCode(int errorCode) {
-        this.errorCode = errorCode;
-
-        return this;
-    }
-
-    @Override public String getStatusMsg() {
-        return errorMsg;
-    }
-
-    @Override public Builder setStatusMsg(String errorMsg) {
-        this.errorMsg = errorMsg;
-
-        return this;
-    }
-
-    @Override public RaftClientMessages.StatusResponse build() {
-        return this;
-    }
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeaderRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeaderRequestImpl.java
index 2cc6592..24957df 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeaderRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeaderRequestImpl.java
@@ -7,7 +7,7 @@ class TransferLeaderRequestImpl implements RaftClientMessages.TransferLeaderRequ
     private String groupId;
     private PeerId peerId;
 
-    @Override public String getGroupId() {
+    public String getGroupId() {
         return groupId;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/UserRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/UserRequestImpl.java
index 06d667e..93cc463 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/UserRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/UserRequestImpl.java
@@ -2,11 +2,11 @@ package org.apache.ignite.raft.client.message;
 
 import org.apache.ignite.raft.client.RaftClientMessages;
 
-public class UserRequestImpl<T> implements RaftClientMessages.UserRequest<T>, RaftClientMessages.UserRequest.Builder<T> {
-    private T request;
+public class UserRequestImpl<T> implements RaftClientMessages.UserRequest, RaftClientMessages.UserRequest.Builder {
+    private Object request;
     private String groupId;
 
-    @Override public T request() {
+    @Override public Object request() {
         return request;
     }
 
@@ -16,13 +16,13 @@ public class UserRequestImpl<T> implements RaftClientMessages.UserRequest<T>, Ra
         return this;
     }
 
-    @Override public Builder setRequest(T request) {
+    @Override public Builder setRequest(Object request) {
         this.request = request;
 
         return this;
     }
 
-    @Override public RaftClientMessages.UserRequest<T> build() {
+    @Override public RaftClientMessages.UserRequest build() {
         return this;
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
index 14384e9..382e953 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
@@ -17,11 +17,12 @@
 package org.apache.ignite.raft.client.rpc;
 
 import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
 import org.apache.ignite.raft.State;
 import org.apache.ignite.raft.PeerId;
-import org.apache.ignite.raft.client.message.ClientMessageBuilderFactory;
-import org.apache.ignite.raft.rpc.Message;
-import org.apache.ignite.raft.rpc.RaftGroupMessage;
+import org.apache.ignite.raft.client.RaftClientMessages.UserRequest;
+import org.apache.ignite.raft.client.RaftClientMessages.UserResponse;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
 
 import static org.apache.ignite.raft.client.RaftClientMessages.AddLearnersRequest;
 import static org.apache.ignite.raft.client.RaftClientMessages.AddPeerRequest;
@@ -35,30 +36,27 @@ import static org.apache.ignite.raft.client.RaftClientMessages.RemovePeerRespons
 import static org.apache.ignite.raft.client.RaftClientMessages.ResetLearnersRequest;
 import static org.apache.ignite.raft.client.RaftClientMessages.ResetPeerRequest;
 import static org.apache.ignite.raft.client.RaftClientMessages.SnapshotRequest;
-import static org.apache.ignite.raft.client.RaftClientMessages.StatusResponse;
 import static org.apache.ignite.raft.client.RaftClientMessages.TransferLeaderRequest;
 
 /**
- * Low-level raft group RPC client.
- * <p>
- * Additionally maintains raft group state.
+ * Replicating group RPC client.
  */
 public interface RaftGroupRpcClient {
     /**
      * @param groupId Group id.
      * @return Group state snapshot.
      */
-    State state(String groupId);
+    @NotNull State state(String groupId);
 
     /**
-     * Refreshes a group leader.
+     * Refreshes a replicating group leader.
      * @param groupId Group id.
      * @return A future.
      */
     CompletableFuture<PeerId> refreshLeader(String groupId);
 
     /**
-     * Refreshes group members (but without a leader).
+     * Refreshes a replicating group members (except a leader).
      * @param groupId Group id.
      * @return A future.
      */
@@ -88,7 +86,7 @@ public interface RaftGroupRpcClient {
      * @param request   request data
      * @return A future with result.
      */
-    CompletableFuture<StatusResponse> resetPeers(PeerId peerId, ResetPeerRequest request);
+    CompletableFuture<Void> resetPeers(PeerId peerId, ResetPeerRequest request);
 
     /**
      * Takes a local snapshot.
@@ -98,7 +96,7 @@ public interface RaftGroupRpcClient {
      * @param done      callback
      * @return a future with result
      */
-    CompletableFuture<StatusResponse> snapshot(PeerId peerId, SnapshotRequest request);
+    CompletableFuture<Void> snapshot(PeerId peerId, SnapshotRequest request);
 
     /**
      * Change peers.
@@ -148,20 +146,20 @@ public interface RaftGroupRpcClient {
      * @param done      callback
      * @return a future with result
      */
-    CompletableFuture<StatusResponse> transferLeader(TransferLeaderRequest request);
+    CompletableFuture<Void> transferLeader(TransferLeaderRequest request);
 
     /**
-     * Performs a custom action defined by specific request on the raft group leader.
+     * Performs a user action defined by specific request to the raft group leader.
      *
      * @param endpoint  server address
      * @param request   request data
      * @param done      callback
      * @return a future with result
      */
-    <R extends Message> CompletableFuture<R> sendCustom(RaftGroupMessage request);
+    <R> CompletableFuture<UserResponse<R>> sendUserRequest(UserRequest request);
 
     /**
      * @return A message builder factory.
      */
-    ClientMessageBuilderFactory factory();
+    RaftClientMessageFactory factory();
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
index 4599556..4f46ed9 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
@@ -6,53 +6,43 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
+import org.apache.ignite.network.NetworkCluster;
+import org.apache.ignite.network.NetworkMember;
 import org.apache.ignite.raft.PeerId;
-import org.apache.ignite.raft.RaftException;
 import org.apache.ignite.raft.State;
 import org.apache.ignite.raft.client.RaftClientMessages;
 import org.apache.ignite.raft.client.RaftClientMessages.GetLeaderResponse;
-import org.apache.ignite.raft.client.RaftClientMessages.StatusResponse;
-import org.apache.ignite.raft.client.message.ClientMessageBuilderFactory;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
 import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
-import org.apache.ignite.raft.rpc.InvokeCallback;
-import org.apache.ignite.raft.rpc.Message;
-import org.apache.ignite.raft.rpc.Node;
-import org.apache.ignite.raft.rpc.RaftGroupMessage;
-import org.apache.ignite.raft.rpc.RpcClient;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 
 public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
-    private final ExecutorService executor;
-    private final int defaultTimeout;
-    private final RpcClient rpcClient;
+    private final NetworkCluster cluster;
 
     /** Where to ask for initial configuration. */
-    private final Set<Node> initialCfgNodes;
+    private final Set<NetworkMember> initialCfgNodes;
+
+    /** */
+    private final RaftClientMessageFactory factory;
 
     /** */
-    private final ClientMessageBuilderFactory factory;
+    private final int defaultTimeout;
 
     private Map<String, StateImpl> states = new ConcurrentHashMap<>();
 
     /**
      * Accepts dependencies in constructor.
-     * @param rpcClient
-     * @param defaultTimeout
+     * @param cluster Cluster.
+     * @param defaultTimeout Default request timeout.
      * @param initialCfgNode Initial configuration nodes.
      */
-    public RaftGroupRpcClientImpl(RpcClient rpcClient, ClientMessageBuilderFactory factory, int defaultTimeout, Set<Node> initialCfgNodes) {
+    public RaftGroupRpcClientImpl(NetworkCluster cluster, RaftClientMessageFactory factory, int defaultTimeout, Set<NetworkMember> initialCfgNodes) {
         this.defaultTimeout = defaultTimeout;
-        this.rpcClient = rpcClient;
+        this.cluster = cluster;
         this.factory = factory;
         this.initialCfgNodes = new HashSet<>(initialCfgNodes);
-        executor = Executors.newWorkStealingPool();
     }
 
     @Override public State state(String groupId) {
@@ -60,27 +50,17 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
     }
 
     private StateImpl getState(String groupId) {
-        StateImpl newState = new StateImpl();
-
-        StateImpl state = states.putIfAbsent(groupId, newState);
-
-        if (state == null)
-            state = newState;
-
-        return state;
+        return states.computeIfAbsent(groupId, k -> new StateImpl());
     }
 
     @Override public CompletableFuture<PeerId> refreshLeader(String groupId) {
         StateImpl state = getState(groupId);
 
-        return refreshLeader(initialCfgNodes.iterator().next(), groupId).
-            thenApply(resp -> {
-                PeerId leaderId = resp.getLeaderId();
+        RaftClientMessages.GetLeaderRequest req = factory.createGetLeaderRequest().setGroupId(groupId).build();
 
-                state.leader = leaderId;
+        CompletableFuture<GetLeaderResponse> fut = cluster.sendWithResponse(initialCfgNodes.iterator().next(), req, defaultTimeout);
 
-                return leaderId;
-            });
+        return fut.thenApply(resp -> state.leader = resp.getLeaderId());
     }
 
     @Override public CompletableFuture<State> refreshMembers(String groupId) {
@@ -95,11 +75,11 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
         return null;
     }
 
-    @Override public CompletableFuture<StatusResponse> resetPeers(PeerId peerId, RaftClientMessages.ResetPeerRequest request) {
+    @Override public CompletableFuture<Void> resetPeers(PeerId peerId, RaftClientMessages.ResetPeerRequest request) {
         return null;
     }
 
-    @Override public CompletableFuture<StatusResponse> snapshot(PeerId peerId, RaftClientMessages.SnapshotRequest request) {
+    @Override public CompletableFuture<Void> snapshot(PeerId peerId, RaftClientMessages.SnapshotRequest request) {
         return null;
     }
 
@@ -119,82 +99,23 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
         return null;
     }
 
-    @Override public CompletableFuture<StatusResponse> transferLeader(RaftClientMessages.TransferLeaderRequest request) {
+    @Override public CompletableFuture<Void> transferLeader(RaftClientMessages.TransferLeaderRequest request) {
         return null;
     }
 
-    private CompletableFuture<GetLeaderResponse> refreshLeader(Node node, String groupId) {
-        StateImpl state = getState(groupId);
-
-        while(true) {
-            CompletableFuture<GetLeaderResponse> fut = state.updateFutRef.get();
-
-            if (fut != null)
-                return fut;
-
-            if (state.updateFutRef.compareAndSet(null, (fut = new CompletableFuture<>()))) {
-                RaftClientMessages.GetLeaderRequest req = factory.createGetLeaderRequest().setGroupId(groupId).build();
+    @Override public CompletableFuture<RaftClientMessages.UserResponse> sendUserRequest(RaftClientMessages.UserRequest request) {
+        if (request.getGroupId() == null)
+            throw new IllegalArgumentException("groupId is required");
 
-                CompletableFuture<GetLeaderResponse> finalFut = fut;
-
-                rpcClient.invokeAsync(node, req, new InvokeCallback<GetLeaderResponse>() {
-                    @Override public void complete(GetLeaderResponse response, Throwable err) {
-                        if (err != null)
-                            finalFut.completeExceptionally(err);
-                        else
-                            finalFut.complete(response);
-
-                        state.updateFutRef.set(null);
-                    }
-                }, executor, defaultTimeout);
-
-                return fut;
-            }
-        }
-    }
-
-    @Override public <R extends Message> CompletableFuture<R> sendCustom(RaftGroupMessage request) {
         State state = state(request.getGroupId());
 
-        CompletableFuture<R> fut = new CompletableFuture<>();
-
-        fut.orTimeout(defaultTimeout, TimeUnit.MILLISECONDS);
-
         CompletableFuture<PeerId> fut0 = state.leader() == null ?
             refreshLeader(request.getGroupId()) : completedFuture(state.leader());
 
-        fut0.whenComplete(new BiConsumer<PeerId, Throwable>() {
-            @Override public void accept(PeerId peerId, Throwable error) {
-                if (error == null) {
-                    rpcClient.invokeAsync(peerId.getNode(), request, new InvokeCallback<R>() {
-                        @Override public void complete(R response, Throwable err) {
-                            if (err != null)
-                                fut.completeExceptionally(err);
-                            else {
-                                if (response instanceof StatusResponse) {
-                                    StatusResponse resp = (StatusResponse) response;
-
-                                    // Translate error response to exception with the code.
-                                    if (resp.getStatusCode() != 0)
-                                        fut.completeExceptionally(new RaftException(resp.getStatusCode(), resp.getStatusMsg()));
-                                    else
-                                        fut.complete(response);
-                                } else
-                                    fut.complete(response);
-                            }
-                        }
-                    }, executor, defaultTimeout);
-                }
-                else {
-                    fut.completeExceptionally(error);
-                }
-            }
-        });
-
-        return fut;
-    }
-
-    @Override public ClientMessageBuilderFactory factory() {
+        return fut0.thenCompose(peerId -> cluster.sendWithResponse(peerId.getNode(), request, defaultTimeout));
+    }
+
+    @Override public RaftClientMessageFactory factory() {
         return this.factory;
     }
 
@@ -205,8 +126,6 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
 
         private volatile List<PeerId> learners;
 
-        private AtomicReference<CompletableFuture<GetLeaderResponse>> updateFutRef = new AtomicReference<>();
-
         @Override public @Nullable PeerId leader() {
             return leader;
         }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java
index 2c5e04a..a4a8a59 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java
@@ -13,5 +13,5 @@ public interface RaftGroupClientRequestService {
      * @param <R> Response.
      * @return A future.
      */
-    <T, R> CompletableFuture<R> submit(T request);
+    <R> CompletableFuture<R> submit(Object request);
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupManagmentService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupManagmentService.java
index d868dc9..2bd8fac 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupManagmentService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupManagmentService.java
@@ -57,7 +57,7 @@ public interface RaftGroupManagmentService {
      * @param peerId  Peer id.
      * @param request   request data
      * @param done      callback
-     * @return a future with result
+     * @return a future with result.
      */
     CompletableFuture<Void> snapshot(PeerId peerId);
 
@@ -77,7 +77,7 @@ public interface RaftGroupManagmentService {
      * @param endpoint  server address
      * @param request   request data
      * @param done      callback
-     * @return a future with result
+     * @return a future with result.
      */
     CompletableFuture<PeersChangeState> addLearners(List<PeerId> peers);
 
@@ -97,7 +97,7 @@ public interface RaftGroupManagmentService {
      * @param endpoint  server address
      * @param request   request data
      * @param done      callback
-     * @return a future with result
+     * @return a future with result.
      */
     CompletableFuture<PeersChangeState> resetLearners(List<PeerId> peers);
 
@@ -107,7 +107,7 @@ public interface RaftGroupManagmentService {
      * @param endpoint  server address
      * @param request   request data
      * @param done      callback
-     * @return a future with result
+     * @return a future with result.
      */
     CompletableFuture<Void> transferLeader(PeerId newLeader);
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java
index 16d63c4..c5b7c25 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java
@@ -1,11 +1,9 @@
 package org.apache.ignite.raft.client.service.impl;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
 import org.apache.ignite.raft.client.RaftClientMessages;
 import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
 import org.apache.ignite.raft.client.service.RaftGroupClientRequestService;
-import org.apache.ignite.raft.rpc.Message;
 
 public class RaftGroupClientRequestServiceImpl implements RaftGroupClientRequestService {
     private final RaftGroupRpcClient rpcClient;
@@ -16,16 +14,12 @@ public class RaftGroupClientRequestServiceImpl implements RaftGroupClientRequest
         this.groupId = groupId;
     }
 
-    @Override public <T, R> CompletableFuture<R> submit(T request) {
+    @Override public <R> CompletableFuture<R> submit(Object request) {
         RaftClientMessages.UserRequest r =
             rpcClient.factory().createUserRequest().setRequest(request).setGroupId(groupId).build();
 
-        return rpcClient.sendCustom(r).thenApply(new Function<Message, R>() {
-            @Override public R apply(Message message) {
-                RaftClientMessages.UserResponse<R> resp = (RaftClientMessages.UserResponse<R>) message;
+        CompletableFuture<RaftClientMessages.UserResponse<R>> completableFuture = rpcClient.sendUserRequest(r);
 
-                return resp.response();
-            }
-        });
+        return completableFuture.thenApply(resp -> resp.response());
     }
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java
index a70c1fb..28e28c8 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java
@@ -8,7 +8,7 @@ import org.apache.ignite.raft.client.service.RaftGroupManagmentService;
 import org.jetbrains.annotations.Nullable;
 
 public class RaftGroupManagementServiceImpl implements RaftGroupManagmentService {
-    private RaftGroupRpcClient rpcClient;
+    private final RaftGroupRpcClient rpcClient;
 
     public RaftGroupManagementServiceImpl(RaftGroupRpcClient rpcClient) {
         this.rpcClient = rpcClient;
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java
deleted file mode 100644
index 743d519..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.raft.rpc;
-
-/**
- *
- */
-public interface InvokeCallback<T extends Message> {
-    void complete(final T response, final Throwable err);
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Message.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Message.java
deleted file mode 100644
index 175d27b..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Message.java
+++ /dev/null
@@ -1,4 +0,0 @@
-package org.apache.ignite.raft.rpc;
-
-public interface Message {
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java
deleted file mode 100644
index 966ff52..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.ignite.raft.rpc;
-
-import java.io.Serializable;
-
-/**
- * TODO FIXME asch must be elsewhere.
- */
-public interface Node extends Serializable {
-    String id();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
deleted file mode 100644
index ea6f10c..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.raft.rpc;
-
-/**
- * An node with an immutable id.
- * TODO FIXME asch must be elsewhere.
- */
-public class NodeImpl implements Node {
-    private final String id;
-
-    public NodeImpl(String id) {
-        super();
-        this.id = id;
-    }
-
-    @Override public String id() {
-        return id;
-    }
-
-    @Override public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        NodeImpl node = (NodeImpl) o;
-
-        if (!id.equals(node.id)) return false;
-
-        return true;
-    }
-
-    @Override public int hashCode() {
-        return id.hashCode();
-    }
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java
deleted file mode 100644
index 071360c..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.ignite.raft.rpc;
-
-/**
- * A message targeted to a specific raft group.
- */
-public interface RaftGroupMessage extends Message {
-    public String getGroupId();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java
deleted file mode 100644
index 4231dae..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.raft.rpc;
-
-import java.util.concurrent.Executor;
-
-/**
- *
- */
-public interface RpcClient {
-    /**
-     * Check connection for given address.
-     *
-     * @param node target address
-     * @return true if there is a connection and the connection is active and writable.
-     */
-    boolean checkConnection(final Node node);
-
-    /**
-     * Check connection for given address and async to create a new one if there is no connection.
-     *
-     * @param node       target address
-     * @param createIfAbsent create a new one if there is no connection
-     * @return true if there is a connection and the connection is active and writable.
-     */
-    boolean checkConnection(final Node node, final boolean createIfAbsent);
-
-    /**
-     * Close all connections of a address.
-     *
-     * @param node target address
-     */
-    void closeConnection(final Node node);
-
-    /**
-     * Asynchronous invocation with a callback.
-     *
-     * @param node  Target node.
-     * @param request   Request object
-     * @param callback  Invoke callback.
-     * @param executor  Executor to run invoke callback.
-     * @param timeoutMs Timeout millisecond.
-     */
-    void invokeAsync(final Node node, final Message request, InvokeCallback callback, Executor executor, final long timeoutMs);
-}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/MockUtils.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/MockUtils.java
new file mode 100644
index 0000000..9edc931
--- /dev/null
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/MockUtils.java
@@ -0,0 +1,71 @@
+package org.apache.ignite.raft.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.network.NetworkCluster;
+import org.apache.ignite.network.NetworkMember;
+import org.apache.ignite.raft.PeerId;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.apache.ignite.raft.client.message.RaftClientMessageFactoryImpl.INSTANCE;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+
+public class MockUtils {
+    public static PeerId LEADER = new PeerId(new NetworkMember("test"));
+
+    public static class TestInput1 {
+    }
+
+    public static class TestOutput1 {
+    }
+
+    public static class TestInput2 {
+    }
+
+    public static class TestOutput2 {
+    }
+
+    public static void mockUserInput1(NetworkCluster cluster) {
+        Mockito.doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+                RaftClientMessages.UserResponse resp = INSTANCE.createUserResponse().setResponse(new TestOutput1()).build();
+
+                return CompletableFuture.completedFuture(resp);
+            }
+        }).when(cluster).sendWithResponse(eq(LEADER.getNode()), argThat(new ArgumentMatcher<RaftClientMessages.UserRequest>() {
+            @Override public boolean matches(RaftClientMessages.UserRequest arg) {
+                return arg.request() instanceof TestInput1;
+            }
+        }), anyLong());
+    }
+
+    public static void mockUserInput2(NetworkCluster cluster) {
+        Mockito.doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+                RaftClientMessages.UserResponse resp = INSTANCE.createUserResponse().setResponse(new TestOutput2()).build();
+
+                return CompletableFuture.completedFuture(resp);
+            }
+        }).when(cluster).sendWithResponse(eq(LEADER.getNode()), argThat(new ArgumentMatcher<RaftClientMessages.UserRequest>() {
+            @Override public boolean matches(RaftClientMessages.UserRequest arg) {
+                return arg.request() instanceof TestInput2;
+            }
+        }), anyLong());
+    }
+
+    public static void mockLeaderRequest(NetworkCluster cluster, boolean timeout) {
+        Mockito.doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+                RaftClientMessages.GetLeaderResponse resp = INSTANCE.createGetLeaderResponse().setLeaderId(LEADER).build();
+
+                return timeout ? CompletableFuture.failedFuture(new TimeoutException()) : CompletableFuture.completedFuture(resp);
+            }
+        }).when(cluster).sendWithResponse(eq(LEADER.getNode()), any(RaftClientMessages.GetLeaderRequest.class), anyLong());
+    }
+}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClientTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClientTest.java
index 5247c59..8ab61e3 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClientTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClientTest.java
@@ -2,62 +2,56 @@ package org.apache.ignite.raft.client.rpc;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
+import org.apache.ignite.network.NetworkCluster;
 import org.apache.ignite.raft.PeerId;
-import org.apache.ignite.raft.State;
-import org.apache.ignite.raft.client.RaftClientMessages.GetLeaderRequest;
+import org.apache.ignite.raft.client.MockUtils.TestInput1;
+import org.apache.ignite.raft.client.MockUtils.TestInput2;
+import org.apache.ignite.raft.client.MockUtils.TestOutput1;
+import org.apache.ignite.raft.client.MockUtils.TestOutput2;
+import org.apache.ignite.raft.client.RaftClientMessages;
 import org.apache.ignite.raft.client.rpc.impl.RaftGroupRpcClientImpl;
-import org.apache.ignite.raft.rpc.InvokeCallback;
-import org.apache.ignite.raft.rpc.Message;
-import org.apache.ignite.raft.rpc.NodeImpl;
-import org.apache.ignite.raft.rpc.RaftGroupMessage;
-import org.apache.ignite.raft.rpc.RpcClient;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.stubbing.Answer;
 
 import static java.util.Collections.singleton;
-import static org.apache.ignite.raft.client.message.RaftClientMessageBuilderFactory.INSTANCE;
+import static org.apache.ignite.raft.client.MockUtils.LEADER;
+import static org.apache.ignite.raft.client.MockUtils.mockLeaderRequest;
+import static org.apache.ignite.raft.client.MockUtils.mockUserInput1;
+import static org.apache.ignite.raft.client.MockUtils.mockUserInput2;
+import static org.apache.ignite.raft.client.message.RaftClientMessageFactoryImpl.INSTANCE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
 
 @ExtendWith(MockitoExtension.class)
 public class RaftGroupRpcClientTest {
     @Mock
-    private RpcClient rpcClient;
-
-    private static PeerId leader = new PeerId(new NodeImpl("test"));
+    private NetworkCluster cluster;
 
     @Test
     public void testRefreshLeader() throws Exception {
         String groupId = "test";
 
-        mockLeaderRequest(false);
+        mockLeaderRequest(cluster, false);
 
-        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, INSTANCE, 5_000, singleton(leader.getNode()));
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(cluster, INSTANCE, 5_000, singleton(LEADER.getNode()));
 
         PeerId leaderId = client.refreshLeader(groupId).get();
 
-        assertEquals(leader, client.state(groupId).leader());
-        assertEquals(leader, leaderId);
+        assertEquals(LEADER, client.state(groupId).leader());
+        assertEquals(LEADER, leaderId);
     }
 
     @Test
     public void testRefreshLeaderMultithreaded() throws Exception {
         String groupId = "test";
 
-        mockLeaderRequest(false);
+        mockLeaderRequest(cluster, false);
 
-        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, INSTANCE, 5_000, singleton(leader.getNode()));
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(cluster, INSTANCE, 5_000, singleton(LEADER.getNode()));
 
         int cnt = 20;
 
@@ -79,8 +73,8 @@ public class RaftGroupRpcClientTest {
                     try {
                         PeerId leaderId = client.refreshLeader(groupId).get();
 
-                        assertEquals(leader, client.state(groupId).leader());
-                        assertEquals(leader, leaderId);
+                        assertEquals(LEADER, client.state(groupId).leader());
+                        assertEquals(LEADER, leaderId);
                     }
                     catch (Exception e) {
                         fail(e);
@@ -99,10 +93,9 @@ public class RaftGroupRpcClientTest {
     public void testRefreshLeaderTimeout() throws Exception {
         String groupId = "test";
 
-        mockLeaderRequest(true);
+        mockLeaderRequest(cluster, true);
 
-        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, INSTANCE,
-            5_000, singleton(leader.getNode()));
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(cluster, INSTANCE, 5_000, singleton(LEADER.getNode()));
 
         try {
             client.refreshLeader(groupId).get();
@@ -118,66 +111,23 @@ public class RaftGroupRpcClientTest {
     public void testCustomMessage() throws Exception {
         String groupId = "test";
 
-        mockLeaderRequest(false);
-        mockCustomRequest();
-
-        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, INSTANCE,
-            5_000, singleton(leader.getNode()));
-
-        JunkRequest req = new JunkRequest(groupId);
-
-        Message resp = client.sendCustom(req).get();
-
-        State state = client.state(groupId);
-
-        // Expecting raft group state to be transparently loaded on first request.
-        assertEquals(leader, state.leader());
-
-        assertTrue(resp instanceof JunkResponse);
-    }
-
-    private static class JunkRequest implements RaftGroupMessage {
-        private final String groupId;
-
-        JunkRequest(String groupId) {
-            this.groupId = groupId;
-        }
+        mockLeaderRequest(cluster, false);
+        mockUserInput1(cluster);
+        mockUserInput2(cluster);
 
-        @Override public String getGroupId() {
-            return groupId;
-        }
-    }
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(cluster, INSTANCE, 5_000, singleton(LEADER.getNode()));
 
-    private static class JunkResponse implements Message {}
+        RaftClientMessages.UserRequest req1 =
+            client.factory().createUserRequest().setGroupId(groupId).setRequest(new TestInput1()).build();
 
-    private void mockCustomRequest() {
-        Mockito.doAnswer(new Answer() {
-            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
-                InvokeCallback callback = invocation.getArgument(2);
-                Executor executor = invocation.getArgument(3);
+        assertTrue(client.sendUserRequest(req1).get().response() instanceof TestOutput1);
 
-                executor.execute(() -> callback.complete(new JunkResponse(), null));
+        RaftClientMessages.UserRequest req2 =
+            client.factory().createUserRequest().setGroupId(groupId).setRequest(new TestInput2()).build();
 
-                return null;
-            }
-        }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(JunkRequest.class), any(), any(), anyLong());
-    }
+        assertTrue(client.sendUserRequest(req2).get().response() instanceof TestOutput2);
 
-    private void mockLeaderRequest(boolean timeout) {
-        Mockito.doAnswer(new Answer() {
-            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
-                InvokeCallback callback = invocation.getArgument(2);
-                Executor executor = invocation.getArgument(3);
-
-                executor.execute(() -> {
-                    if (timeout)
-                        callback.complete(null, new TimeoutException());
-                    else
-                        callback.complete(INSTANCE.createGetLeaderResponse().setLeaderId(leader).build(), null);
-                });
-
-                return null;
-            }
-        }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(GetLeaderRequest.class), any(), any(), anyLong());
+        // Expecting raft group state to be transparently loaded on first request.
+        assertEquals(LEADER, client.state(groupId).leader());
     }
 }
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestServiceTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestServiceTest.java
index 56e3ff8..1c802a0 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestServiceTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestServiceTest.java
@@ -1,51 +1,43 @@
 package org.apache.ignite.raft.client.service;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import org.apache.ignite.raft.PeerId;
-import org.apache.ignite.raft.client.RaftClientMessages;
-import org.apache.ignite.raft.client.RaftClientMessages.UserRequest;
+import org.apache.ignite.network.NetworkCluster;
+import org.apache.ignite.raft.client.MockUtils.TestInput1;
+import org.apache.ignite.raft.client.MockUtils.TestInput2;
+import org.apache.ignite.raft.client.MockUtils.TestOutput1;
+import org.apache.ignite.raft.client.MockUtils.TestOutput2;
 import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
 import org.apache.ignite.raft.client.rpc.impl.RaftGroupRpcClientImpl;
 import org.apache.ignite.raft.client.service.impl.RaftGroupClientRequestServiceImpl;
-import org.apache.ignite.raft.rpc.InvokeCallback;
-import org.apache.ignite.raft.rpc.NodeImpl;
-import org.apache.ignite.raft.rpc.RpcClient;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.stubbing.Answer;
 
 import static java.util.Collections.singleton;
-import static org.apache.ignite.raft.client.message.RaftClientMessageBuilderFactory.INSTANCE;
+import static org.apache.ignite.raft.client.MockUtils.LEADER;
+import static org.apache.ignite.raft.client.MockUtils.mockLeaderRequest;
+import static org.apache.ignite.raft.client.MockUtils.mockUserInput1;
+import static org.apache.ignite.raft.client.MockUtils.mockUserInput2;
+import static org.apache.ignite.raft.client.message.RaftClientMessageFactoryImpl.INSTANCE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.ArgumentMatchers.eq;
 
 @ExtendWith(MockitoExtension.class)
 public class RaftGroupClientRequestServiceTest {
     @Mock
-    private RpcClient rpcClient;
-
-    private static PeerId leader = new PeerId(new NodeImpl("test"));
+    private NetworkCluster cluster;
 
     @Test
     public void testUserRequest() throws Exception {
         String groupId = "test";
 
-        mockLeaderRequest();
-        mockUserRequest1();
-        mockUserRequest2();
+        mockLeaderRequest(cluster, false);
+        mockUserInput1(cluster);
+        mockUserInput2(cluster);
 
-        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, INSTANCE, 5_000, singleton(leader.getNode()));
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(cluster, INSTANCE, 5_000, singleton(LEADER.getNode()));
 
         RaftGroupClientRequestService service = new RaftGroupClientRequestServiceImpl(client, groupId);
 
@@ -63,70 +55,6 @@ public class RaftGroupClientRequestServiceTest {
 
         assertNotNull(output2);
 
-        assertEquals(leader, client.state(groupId).leader());
-    }
-
-    private static class TestInput1 {
-    }
-
-    private static class TestOutput1 {
-    }
-
-    private static class TestInput2 {
-    }
-
-    private static class TestOutput2 {
-    }
-
-    private void mockLeaderRequest() {
-        Mockito.doAnswer(new Answer() {
-            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
-                InvokeCallback callback = invocation.getArgument(2);
-                Executor executor = invocation.getArgument(3);
-
-                executor.execute(() -> {
-                    callback.complete(INSTANCE.createGetLeaderResponse().setLeaderId(leader).build(), null);
-                });
-
-                return null;
-            }
-        }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(RaftClientMessages.GetLeaderRequest.class),
-            any(), any(), anyLong());
-    }
-
-    private void mockUserRequest1() {
-        Mockito.doAnswer(new Answer() {
-            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
-                InvokeCallback callback = invocation.getArgument(2);
-                Executor executor = invocation.getArgument(3);
-
-                executor.execute(() -> callback.complete(INSTANCE.createUserResponse().
-                    setResponse(new TestOutput1()).build(), null));
-
-                return null;
-            }
-        }).when(rpcClient).invokeAsync(eq(leader.getNode()), argThat(new ArgumentMatcher<UserRequest>() {
-            @Override public boolean matches(UserRequest argument) {
-                return argument.request() instanceof TestInput1;
-            }
-        }), any(), any(), anyLong());
-    }
-
-    private void mockUserRequest2() {
-        Mockito.doAnswer(new Answer() {
-            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
-                InvokeCallback callback = invocation.getArgument(2);
-                Executor executor = invocation.getArgument(3);
-
-                executor.execute(() -> callback.complete(INSTANCE.createUserResponse().
-                    setResponse(new TestOutput2()).build(), null));
-
-                return null;
-            }
-        }).when(rpcClient).invokeAsync(eq(leader.getNode()), argThat(new ArgumentMatcher<UserRequest>() {
-            @Override public boolean matches(UserRequest argument) {
-                return argument.request() instanceof TestInput2;
-            }
-        }), any(), any(), anyLong());
+        assertEquals(LEADER, client.state(groupId).leader());
     }
 }


[ignite-3] 01/02: Merge branch 'main' of https://gitbox.apache.org/repos/asf/ignite-3 into ignite-14149

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch ignite-14149
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit b22ccf8a11de449da6796bffc7c2fe90ddba8fb8
Merge: c4871ce c548a82
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Mon Mar 1 11:06:30 2021 +0300

    Merge branch 'main' of https://gitbox.apache.org/repos/asf/ignite-3 into ignite-14149
    
    # Conflicts:
    #	pom.xml

 .../processor/internal/ITProcessorTest.java        |  12 +-
 .../configuration/processor/internal/Types.java    |   3 +
 .../internal/TestConfigurationSchema.java          |   3 +
 .../processor/internal/Processor.java              | 138 +++++++---
 .../internal/util/ConfigurationUtilTest.java       |  32 ++-
 .../sample/ConfigurationArrayTest.java             | 105 ++++++++
 .../sample/storage/ConfigurationChangerTest.java   |  69 ++---
 .../sample/storage/TestConfigurationStorage.java   |   6 +-
 .../ignite/configuration/ConfigurationChanger.java | 277 ++++++++++++---------
 .../configuration/ConfigurationRegistry.java       |  20 ++
 .../org/apache/ignite/configuration/RootKey.java   |  13 +-
 .../apache/ignite/configuration/RootKeyImpl.java   |  56 +++++
 .../internal/util/ConfigurationUtil.java           | 130 +++++++++-
 .../storage/ConfigurationStorage.java              |   2 +-
 .../apache/ignite/configuration/storage/Data.java  |   6 +-
 .../ignite/configuration/tree/InnerNode.java       |   2 +-
 modules/network/pom.xml                            |  98 ++++++++
 .../ITScaleCubeNetworkClusterMessagingTest.java    |  97 ++++++++
 .../ignite/network/scalecube/TestMessage.java}     |  62 +++--
 .../scalecube/TestNetworkHandlersProvider.java     |  63 +++++
 .../ignite/network/MessageHandlerHolder.java       |  59 +++++
 .../org/apache/ignite/network/NetworkCluster.java  |  70 ++++++
 .../network/NetworkClusterEventHandler.java}       |  32 +--
 .../ignite/network/NetworkClusterFactory.java      |  75 ++++++
 .../ignite/network/NetworkHandlersProvider.java}   |  31 +--
 .../org/apache/ignite/network/NetworkMember.java   |  63 +++++
 .../org/apache/ignite/network/NetworkMessage.java  |  59 +++++
 .../ignite/network/NetworkMessageHandler.java}     |  15 +-
 .../network/scalecube/ScaleCubeMemberResolver.java |  64 +++++
 .../network/scalecube/ScaleCubeMessageHandler.java | 100 ++++++++
 .../network/scalecube/ScaleCubeNetworkCluster.java | 110 ++++++++
 parent/pom.xml                                     |   7 +
 pom.xml                                            |   1 +
 33 files changed, 1606 insertions(+), 274 deletions(-)

diff --cc pom.xml
index e3afc38,ade1018..ed77364
--- a/pom.xml
+++ b/pom.xml
@@@ -40,7 -40,7 +40,8 @@@
          <module>modules/configuration-annotation-processor</module>
          <module>modules/rest</module>
          <module>modules/runner</module>
 +        <module>modules/raft-client</module>
+         <module>modules/network</module>
      </modules>
  
      <build>