You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/04/02 16:00:00 UTC

[ignite-3] branch main updated: IGNITE-14460 Single-node RAFT server (#80).

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ed2b820  IGNITE-14460 Single-node RAFT server (#80).
ed2b820 is described below

commit ed2b820a89478b3cda2b2e73181a1b44e32094c8
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Fri Apr 2 18:57:47 2021 +0300

    IGNITE-14460 Single-node RAFT server (#80).
---
 .../ITScaleCubeNetworkClusterMessagingTest.java    |   2 +-
 .../scalecube/TestNetworkHandlersProvider.java     |   2 +-
 .../org/apache/ignite/network/NetworkCluster.java  |  13 +-
 .../org/apache/ignite/network/NetworkMember.java   |   3 +-
 .../ignite/network/NetworkMessageHandler.java      |   4 +-
 .../message/DefaultMessageMapperProvider.java      |  54 +++++
 .../network/scalecube/ScaleCubeMessageHandler.java |   3 +-
 .../network/scalecube/ScaleCubeNetworkCluster.java |  16 +-
 .../org/apache/ignite/raft/client/Command.java     |   4 +-
 .../java/org/apache/ignite/raft/client/Peer.java   |   3 +-
 .../apache/ignite/raft/client/RaftErrorCode.java   |  10 +-
 .../ignite/raft/client/message/ActionRequest.java  |   3 +-
 .../ignite/raft/client/message/ActionResponse.java |   3 +-
 .../raft/client/message/AddLearnersRequest.java    |   3 +-
 .../raft/client/message/AddPeersRequest.java       |   3 +-
 .../raft/client/message/ChangePeersResponse.java   |   3 +-
 .../raft/client/message/GetLeaderRequest.java      |   3 +-
 .../raft/client/message/GetLeaderResponse.java     |   3 +-
 .../raft/client/message/GetPeersRequest.java       |   3 +-
 .../raft/client/message/GetPeersResponse.java      |   3 +-
 .../{impl => }/RaftClientMessageFactory.java       |  17 +-
 .../raft/client/message/RaftErrorResponse.java     |   3 +-
 .../raft/client/message/RemoveLearnersRequest.java |   3 +-
 .../raft/client/message/RemovePeersRequest.java    |   3 +-
 .../raft/client/message/SnapshotRequest.java       |   3 +-
 .../client/message/TransferLeadershipRequest.java  |   3 +-
 .../message/impl/RaftClientMessageFactoryImpl.java |   1 +
 ...oupCommandListener.java => CommandClosure.java} |  26 +-
 .../client/service/RaftGroupCommandListener.java   |   4 +-
 .../client/service/impl/RaftGroupServiceImpl.java  |  10 +-
 .../raft/client/service/RaftGroupServiceTest.java  |   6 +-
 modules/raft/pom.xml                               |  69 ++++++
 .../ignite/raft/server/CounterCommandListener.java |  53 +++++
 .../ignite/raft/server/GetValueCommand.java}       |  10 +-
 .../raft/server/ITRaftCounterServerTest.java       | 186 +++++++++++++++
 .../raft/server/IncrementAndGetCommand.java}       |  26 +-
 .../org/apache/ignite/raft/server/RaftServer.java  |  56 +++++
 .../ignite/raft/server/impl/RaftServerImpl.java    | 264 +++++++++++++++++++++
 pom.xml                                            |   1 +
 39 files changed, 812 insertions(+), 73 deletions(-)

diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java
index f8c03ec..792bd6f 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java
@@ -76,7 +76,7 @@ class ITScaleCubeNetworkClusterMessagingTest {
         final NetworkHandlersProvider messageWaiter = new NetworkHandlersProvider() {
             /** {@inheritDoc} */
             @Override public NetworkMessageHandler messageHandler() {
-                return message -> {
+                return (message, sender, corellationId) -> {
                     latch.countDown();
                 };
             }
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java
index ef77ec9..6ef82d8 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java
@@ -41,7 +41,7 @@ class TestNetworkHandlersProvider implements NetworkHandlersProvider {
 
     /** {@inheritDoc} */
     @Override public NetworkMessageHandler messageHandler() {
-        return event -> {
+        return (event, sender, corellationId) -> {
             MESSAGE_STORAGE.put(localName, event);
 
             System.out.println(localName + " handled messages : " + event);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
index 1b702be..a03f084 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
@@ -64,6 +64,17 @@ public interface NetworkCluster {
     Future<Void> send(NetworkMember member, NetworkMessage msg);
 
     /**
+     * Try to send the message asynchronously to the specific member with next guarantees:
+     * * Messages which was sent from one thread to one member will be delivered in the same order as they were sent.
+     * * If message N was successfully delivered to the member that means all messages preceding N also were successfully delivered.
+     *
+     * @param member Network member which should receive the message.
+     * @param msg Message which should be delivered.
+     * @param corellationId Corellation id when replying to the request.
+     */
+    Future<?> send(NetworkMember member, NetworkMessage msg, String corellationId);
+
+    /**
      * Sends a message asynchronously with same guarantees as for {@link #send(NetworkMember, NetworkMessage)} and
      * returns a response.
      *
@@ -73,7 +84,7 @@ public interface NetworkCluster {
      * @param <R> Expected response type.
      * @return A future holding the response or error if the expected response was not received.
      */
-    CompletableFuture<NetworkMessage> sendWithResponse(NetworkMember member, NetworkMessage msg, long timeout);
+    CompletableFuture<NetworkMessage> invoke(NetworkMember member, NetworkMessage msg, long timeout);
 
     /**
      * Add provider which allows to get configured handlers for different cluster events(ex. received message).
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkMember.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkMember.java
index 9c5e2bc..41ede74 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkMember.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkMember.java
@@ -16,12 +16,13 @@
  */
 package org.apache.ignite.network;
 
+import java.io.Serializable;
 import java.util.Objects;
 
 /**
  * Representation of the network member.
  */
-public class NetworkMember {
+public class NetworkMember implements Serializable {
     /** Unique name of member in cluster. */
     private final String name;
 
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
index 130af86..9e4e784 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
@@ -24,6 +24,8 @@ import org.apache.ignite.network.message.NetworkMessage;
 public interface NetworkMessageHandler {
     /**
      * @param message Message which was received from cluster.
+     * @param sender Sender.
+     * @param corellationId Corellation id.
      */
-    void onReceived(NetworkMessage message);
+    void onReceived(NetworkMessage message, NetworkMember sender, String corellationId);
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/message/DefaultMessageMapperProvider.java b/modules/network/src/main/java/org/apache/ignite/network/message/DefaultMessageMapperProvider.java
new file mode 100644
index 0000000..32e8110
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/message/DefaultMessageMapperProvider.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.message;
+
+import java.io.IOException;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageMapperProvider;
+import org.apache.ignite.network.message.MessageMappingException;
+import org.apache.ignite.network.message.MessageSerializer;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Uses JDK serialization.
+ */
+public class DefaultMessageMapperProvider implements MessageMapperProvider<NetworkMessage> {
+    /** {@inheritDoc} */
+    @Override public MessageDeserializer<NetworkMessage> createDeserializer() {
+        return reader -> {
+            try {
+                return (NetworkMessage)reader.stream().readObject();
+            }
+            catch (Exception e) {
+                throw new MessageMappingException("Failed to deserialize", e);
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public MessageSerializer<NetworkMessage> createSerializer() {
+        return (message, writer) -> {
+            try {
+                writer.stream().writeObject(message);
+            }
+            catch (IOException e) {
+                throw new MessageMappingException("Failed to serialize", e);
+            }
+        };
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java
index 4c32e4b..2e66249 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java
@@ -65,7 +65,8 @@ public class ScaleCubeMessageHandler implements ClusterMessageHandler {
     @Override public void onMessage(Message message) {
         for (NetworkMessageHandler handler : messageHandlerHolder.messageHandlers()) {
             NetworkMessage msg = message.data();
-            handler.onReceived(msg);
+            NetworkMember sender = memberForAddress(message.sender());
+            handler.onReceived(msg, sender, message.correlationId());
         }
     }
 
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
index e83e798..f176dc2 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
@@ -19,6 +19,7 @@ package org.apache.ignite.network.scalecube;
 import io.scalecube.cluster.Cluster;
 import io.scalecube.cluster.transport.api.Message;
 import java.util.Collection;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
@@ -30,6 +31,7 @@ import org.apache.ignite.network.NetworkMember;
 import org.apache.ignite.network.message.NetworkMessage;
 import org.apache.ignite.network.NetworkMessageHandler;
 
+import static io.scalecube.cluster.transport.api.Message.fromData;
 import static java.time.Duration.ofMillis;
 import static org.apache.ignite.network.scalecube.ScaleCubeMessageCodec.HEADER_MESSAGE_TYPE;
 
@@ -88,12 +90,20 @@ public class ScaleCubeNetworkCluster implements NetworkCluster {
 
     /** {@inheritDoc} */
     @Override public Future<Void> send(NetworkMember member, NetworkMessage msg) {
-        return cluster.send(memberResolver.resolveMember(member), fromNetworkMessage(msg)).toFuture();
+        return cluster.send(memberResolver.resolveMember(member), fromData(msg)).toFuture();
+    }
+
+    @Override public Future<?> send(NetworkMember member, NetworkMessage msg, String corellationId) {
+        return cluster.send(memberResolver.resolveMember(member),
+            Message.withData(msg).header(HEADER_MESSAGE_TYPE, String.valueOf(msg.directType())).
+                correlationId(corellationId).build()).toFuture();
     }
 
     /** {@inheritDoc} */
-    @Override public CompletableFuture<NetworkMessage> sendWithResponse(NetworkMember member, NetworkMessage msg, long timeout) {
-        return cluster.requestResponse(memberResolver.resolveMember(member), fromNetworkMessage(msg))
+    @Override public CompletableFuture<NetworkMessage> invoke(NetworkMember member, NetworkMessage msg, long timeout) {
+        return cluster.requestResponse(memberResolver.resolveMember(member),
+            Message.withData(msg).correlationId(UUID.randomUUID().toString()).
+                header(HEADER_MESSAGE_TYPE, String.valueOf(msg.directType())).build())
             .timeout(ofMillis(timeout)).toFuture().thenApply(m -> m.data());
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java
index 3d4cef6..8e2e650 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.raft.client;
 
+import java.io.Serializable;
+
 /**
  * A marker interface for replication group command.
  */
-public interface Command {
+public interface Command extends Serializable {
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java
index d8b94d3..33ecc25 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java
@@ -17,12 +17,13 @@
 
 package org.apache.ignite.raft.client;
 
+import java.io.Serializable;
 import org.apache.ignite.network.NetworkMember;
 
 /**
  * A participant of a replication group.
  */
-public final class Peer {
+public final class Peer implements Serializable {
     /**
      * Network node.
      */
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftErrorCode.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftErrorCode.java
index 0346ed7..3784d1b 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftErrorCode.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftErrorCode.java
@@ -25,10 +25,16 @@ public enum RaftErrorCode {
     SUCCESS(1000, "Successful"),
 
     /** */
-    NO_LEADER(1001, "No leader found within a timeout"),
+    NO_LEADER(1001, "No leader is found within a timeout"),
 
     /** */
-    LEADER_CHANGED(1002, "A peer is no longer a leader");
+    LEADER_CHANGED(1002, "A peer is no longer a leader"),
+
+    /** */
+    ILLEGAL_STATE(1003, "A peer is in illegal state"),
+
+    /** */
+    BUSY(1004, "A peer is busy, retry later");
 
     /** */
     private final int code;
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionRequest.java
index df2c103..ff1c140 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionRequest.java
@@ -17,13 +17,14 @@
 
 package org.apache.ignite.raft.client.message;
 
+import java.io.Serializable;
 import org.apache.ignite.network.message.NetworkMessage;
 import org.apache.ignite.raft.client.Command;
 
 /**
  * Submit an action to a replication group.
  */
-public interface ActionRequest<T> extends NetworkMessage {
+public interface ActionRequest<T> extends NetworkMessage, Serializable {
     /**
      * @return Group id.
      */
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionResponse.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionResponse.java
index 8b187b8..40986c1 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionResponse.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionResponse.java
@@ -17,12 +17,13 @@
 
 package org.apache.ignite.raft.client.message;
 
+import java.io.Serializable;
 import org.apache.ignite.network.message.NetworkMessage;
 
 /**
  * The result of an action.
  */
-public interface ActionResponse<T> extends NetworkMessage {
+public interface ActionResponse<T> extends NetworkMessage, Serializable {
     /**
      * @return A result for this request, can be of any type.
      */
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequest.java
index 91adb59..3e79531 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.raft.client.message;
 
+import java.io.Serializable;
 import java.util.List;
 import org.apache.ignite.network.message.NetworkMessage;
 import org.apache.ignite.raft.client.Peer;
@@ -24,7 +25,7 @@ import org.apache.ignite.raft.client.Peer;
 /**
  * Add learners.
  */
-public interface AddLearnersRequest extends NetworkMessage {
+public interface AddLearnersRequest extends NetworkMessage, Serializable {
     /**
      * @return Group id.
      */
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeersRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeersRequest.java
index 6851dc3..7883f02 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeersRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeersRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.raft.client.message;
 
+import java.io.Serializable;
 import java.util.List;
 import org.apache.ignite.network.message.NetworkMessage;
 import org.apache.ignite.raft.client.Peer;
@@ -24,7 +25,7 @@ import org.apache.ignite.raft.client.Peer;
 /**
  * Add peers.
  */
-public interface AddPeersRequest extends NetworkMessage {
+public interface AddPeersRequest extends NetworkMessage, Serializable {
     /**
      * @return Group id.
      */
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeersResponse.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeersResponse.java
index 3b63490..72f171b 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeersResponse.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeersResponse.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.raft.client.message;
 
+import java.io.Serializable;
 import java.util.List;
 import org.apache.ignite.network.message.NetworkMessage;
 import org.apache.ignite.raft.client.Peer;
@@ -24,7 +25,7 @@ import org.apache.ignite.raft.client.Peer;
 /**
  * Change peers result.
  */
-public interface ChangePeersResponse extends NetworkMessage {
+public interface ChangePeersResponse extends NetworkMessage, Serializable {
     /**
      * @return Old peers.
      */
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequest.java
index c552033..f4cb03b 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequest.java
@@ -17,12 +17,13 @@
 
 package org.apache.ignite.raft.client.message;
 
+import java.io.Serializable;
 import org.apache.ignite.network.message.NetworkMessage;
 
 /**
  * Get leader.
  */
-public interface GetLeaderRequest extends NetworkMessage {
+public interface GetLeaderRequest extends NetworkMessage, Serializable {
     /**
      * @return Group id.
      */
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderResponse.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderResponse.java
index a1b0e1e..c2a01ee 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderResponse.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderResponse.java
@@ -17,13 +17,14 @@
 
 package org.apache.ignite.raft.client.message;
 
+import java.io.Serializable;
 import org.apache.ignite.network.message.NetworkMessage;
 import org.apache.ignite.raft.client.Peer;
 
 /**
  * A current leader.
  */
-public interface GetLeaderResponse extends NetworkMessage {
+public interface GetLeaderResponse extends NetworkMessage, Serializable {
     /**
      * @return The leader.
      */
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequest.java
index 65de645..e0d94f5 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequest.java
@@ -17,10 +17,11 @@
 
 package org.apache.ignite.raft.client.message;
 
+import java.io.Serializable;
 import org.apache.ignite.network.message.NetworkMessage;
 
 /** Get peers. */
-public interface GetPeersRequest extends NetworkMessage {
+public interface GetPeersRequest extends NetworkMessage, Serializable {
     /**
      * @return Group id.
      */
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersResponse.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersResponse.java
index e8c4e9f..78157a6 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersResponse.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersResponse.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.raft.client.message;
 
+import java.io.Serializable;
 import java.util.List;
 import org.apache.ignite.network.message.NetworkMessage;
 import org.apache.ignite.raft.client.Peer;
@@ -24,7 +25,7 @@ import org.apache.ignite.raft.client.Peer;
 /**
  *
  */
-public interface GetPeersResponse extends NetworkMessage {
+public interface GetPeersResponse extends NetworkMessage, Serializable {
     /**
      * @return Current peers.
      */
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/impl/RaftClientMessageFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactory.java
similarity index 70%
rename from modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/impl/RaftClientMessageFactory.java
rename to modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactory.java
index 7512dec..9fe6a07 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/impl/RaftClientMessageFactory.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactory.java
@@ -15,22 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.raft.client.message.impl;
-
-import org.apache.ignite.raft.client.message.AddLearnersRequest;
-import org.apache.ignite.raft.client.message.AddPeersRequest;
-import org.apache.ignite.raft.client.message.ChangePeersResponse;
-import org.apache.ignite.raft.client.message.GetLeaderRequest;
-import org.apache.ignite.raft.client.message.GetLeaderResponse;
-import org.apache.ignite.raft.client.message.GetPeersRequest;
-import org.apache.ignite.raft.client.message.GetPeersResponse;
-import org.apache.ignite.raft.client.message.RaftErrorResponse;
-import org.apache.ignite.raft.client.message.RemoveLearnersRequest;
-import org.apache.ignite.raft.client.message.RemovePeersRequest;
-import org.apache.ignite.raft.client.message.SnapshotRequest;
-import org.apache.ignite.raft.client.message.TransferLeadershipRequest;
-import org.apache.ignite.raft.client.message.ActionRequest;
-import org.apache.ignite.raft.client.message.ActionResponse;
+package org.apache.ignite.raft.client.message;
 
 /**
  * A factory for immutable replication group messages.
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftErrorResponse.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftErrorResponse.java
index 1f90b9b..3b92c63 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftErrorResponse.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftErrorResponse.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.raft.client.message;
 
+import java.io.Serializable;
 import org.apache.ignite.network.message.NetworkMessage;
 import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.client.RaftErrorCode;
@@ -25,7 +26,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Raft error response. Also used as a default response when errorCode == {@link RaftErrorCode#SUCCESS}
  */
-public interface RaftErrorResponse extends NetworkMessage {
+public interface RaftErrorResponse extends NetworkMessage, Serializable {
     /**
      * @return Error code.
      */
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequest.java
index 99d235d..2ba2d9c 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.raft.client.message;
 
+import java.io.Serializable;
 import java.util.List;
 import org.apache.ignite.network.message.NetworkMessage;
 import org.apache.ignite.raft.client.Peer;
@@ -24,7 +25,7 @@ import org.apache.ignite.raft.client.Peer;
 /**
  * Remove learners.
  */
-public interface RemoveLearnersRequest extends NetworkMessage {
+public interface RemoveLearnersRequest extends NetworkMessage, Serializable {
     /**
      * @return Group id.
      */
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeersRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeersRequest.java
index 93e5f2f..488e226 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeersRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeersRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.raft.client.message;
 
+import java.io.Serializable;
 import java.util.List;
 import org.apache.ignite.network.message.NetworkMessage;
 import org.apache.ignite.raft.client.Peer;
@@ -24,7 +25,7 @@ import org.apache.ignite.raft.client.Peer;
 /**
  * Remove peers.
  */
-public interface RemovePeersRequest extends NetworkMessage {
+public interface RemovePeersRequest extends NetworkMessage, Serializable {
     /**
      * @return Group id.
      */
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequest.java
index 4767aec..df607ec 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequest.java
@@ -17,12 +17,13 @@
 
 package org.apache.ignite.raft.client.message;
 
+import java.io.Serializable;
 import org.apache.ignite.network.message.NetworkMessage;
 
 /**
  * Take a local snapshot on the peer.
  */
-public interface SnapshotRequest extends NetworkMessage {
+public interface SnapshotRequest extends NetworkMessage, Serializable {
     /**
      * @return Group id.
      */
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeadershipRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeadershipRequest.java
index f5cf876..e668f7c 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeadershipRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeadershipRequest.java
@@ -17,13 +17,14 @@
 
 package org.apache.ignite.raft.client.message;
 
+import java.io.Serializable;
 import org.apache.ignite.network.message.NetworkMessage;
 import org.apache.ignite.raft.client.Peer;
 
 /**
  * Transfer a leadership to receiving peer.
  */
-public interface TransferLeadershipRequest extends NetworkMessage {
+public interface TransferLeadershipRequest extends NetworkMessage, Serializable {
     /**
      * @return Group id.
      */
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/impl/RaftClientMessageFactoryImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/impl/RaftClientMessageFactoryImpl.java
index ce52677..e0d2cd5 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/impl/RaftClientMessageFactoryImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/impl/RaftClientMessageFactoryImpl.java
@@ -24,6 +24,7 @@ import org.apache.ignite.raft.client.message.GetLeaderRequest;
 import org.apache.ignite.raft.client.message.GetLeaderResponse;
 import org.apache.ignite.raft.client.message.GetPeersRequest;
 import org.apache.ignite.raft.client.message.GetPeersResponse;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
 import org.apache.ignite.raft.client.message.RaftErrorResponse;
 import org.apache.ignite.raft.client.message.RemoveLearnersRequest;
 import org.apache.ignite.raft.client.message.RemovePeersRequest;
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupCommandListener.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/CommandClosure.java
similarity index 64%
copy from modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupCommandListener.java
copy to modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/CommandClosure.java
index d22c334..3adb047 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupCommandListener.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/CommandClosure.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,21 +17,27 @@
 
 package org.apache.ignite.raft.client.service;
 
-import java.util.Iterator;
-import org.apache.ignite.raft.client.ReadCommand;
-import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.Command;
 
 /**
- * A listener for replication group commands.
+ * A closure to notify abbout command processing outcome.
+ * @param <R> Command type.
  */
-public interface RaftGroupCommandListener {
+public interface CommandClosure<R extends Command> {
     /**
-     * @param iterator Read command iterator.
+     * @return The command.
      */
-    void onRead(Iterator<ReadCommand> iterator);
+    R command();
 
     /**
-     * @param iterator Write command iterator.
+     * Success outcome.
+     * @param res The result.
      */
-    void onWrite(Iterator<WriteCommand> iterator);
+    void success(Object res);
+
+    /**
+     * Failure outcome.
+     * @param err The error.
+     */
+    void failure(Throwable err);
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupCommandListener.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupCommandListener.java
index d22c334..6dd14be 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupCommandListener.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupCommandListener.java
@@ -28,10 +28,10 @@ public interface RaftGroupCommandListener {
     /**
      * @param iterator Read command iterator.
      */
-    void onRead(Iterator<ReadCommand> iterator);
+    void onRead(Iterator<CommandClosure<ReadCommand>> iterator);
 
     /**
      * @param iterator Write command iterator.
      */
-    void onWrite(Iterator<WriteCommand> iterator);
+    void onWrite(Iterator<CommandClosure<WriteCommand>> iterator);
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
index 4662a0c..825a8c6 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
@@ -46,7 +46,7 @@ import org.apache.ignite.raft.client.message.RemoveLearnersRequest;
 import org.apache.ignite.raft.client.message.RemovePeersRequest;
 import org.apache.ignite.raft.client.message.SnapshotRequest;
 import org.apache.ignite.raft.client.message.TransferLeadershipRequest;
-import org.apache.ignite.raft.client.message.impl.RaftClientMessageFactory;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.jetbrains.annotations.NotNull;
 
@@ -262,7 +262,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
     @Override public CompletableFuture<Void> snapshot(Peer peer) {
         SnapshotRequest req = factory.snapshotRequest().groupId(groupId).build();
 
-        CompletableFuture<?> fut = cluster.sendWithResponse(peer.getNode(), req, timeout);
+        CompletableFuture<?> fut = cluster.invoke(peer.getNode(), req, timeout);
 
         return fut.thenApply(resp -> null);
     }
@@ -276,7 +276,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
 
         TransferLeadershipRequest req = factory.transferLeaderRequest().groupId(groupId).peer(newLeader).build();
 
-        CompletableFuture<?> fut = cluster.sendWithResponse(newLeader.getNode(), req, timeout);
+        CompletableFuture<?> fut = cluster.invoke(newLeader.getNode(), req, timeout);
 
         return fut.thenApply(resp -> null);
     }
@@ -299,7 +299,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
     @Override public <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd) {
         ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).build();
 
-        CompletableFuture fut = cluster.sendWithResponse(peer.getNode(), req, timeout);
+        CompletableFuture fut = cluster.invoke(peer.getNode(), req, timeout);
 
         return fut.thenApply(resp -> ((ActionResponse) resp).result());
     }
@@ -307,7 +307,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
     private <R> CompletableFuture<R> sendWithRetry(NetworkMember node, NetworkMessage req, long stopTime) {
         if (currentTimeMillis() >= stopTime)
             return CompletableFuture.failedFuture(new TimeoutException());
-        return cluster.sendWithResponse(node, req, timeout)
+        return cluster.invoke(node, req, timeout)
             .thenCompose(resp -> {
                 if (resp instanceof RaftErrorResponse) {
                     RaftErrorResponse resp0 = (RaftErrorResponse)resp;
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
index b8867ed..7512e24 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
@@ -30,7 +30,7 @@ import org.apache.ignite.raft.client.RaftErrorCode;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.apache.ignite.raft.client.message.ActionRequest;
 import org.apache.ignite.raft.client.message.GetLeaderRequest;
-import org.apache.ignite.raft.client.message.impl.RaftClientMessageFactory;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
 import org.apache.ignite.raft.client.message.impl.RaftClientMessageFactoryImpl;
 import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
 import org.junit.jupiter.api.BeforeEach;
@@ -378,7 +378,7 @@ public class RaftGroupServiceTest {
 
                 return completedFuture(resp);
             }
-        }).when(cluster).sendWithResponse(any(), argThat(new ArgumentMatcher<ActionRequest>() {
+        }).when(cluster).invoke(any(), argThat(new ArgumentMatcher<ActionRequest>() {
             @Override public boolean matches(ActionRequest arg) {
                 return arg.command() instanceof TestCommand;
             }
@@ -408,7 +408,7 @@ public class RaftGroupServiceTest {
 
                 return completedFuture(resp);
             }
-        }).when(cluster).sendWithResponse(any(), any(GetLeaderRequest.class), anyLong());
+        }).when(cluster).invoke(any(), any(GetLeaderRequest.class), anyLong());
     }
 
     /** */
diff --git a/modules/raft/pom.xml b/modules/raft/pom.xml
new file mode 100644
index 0000000..483446f
--- /dev/null
+++ b/modules/raft/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent/pom.xml</relativePath>
+    </parent>
+
+    <artifactId>ignite-raft</artifactId>
+    <version>3.0.0-SNAPSHOT</version>
+
+    <dependencies>
+        <!-- logging -->
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-raft-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterCommandListener.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterCommandListener.java
new file mode 100644
index 0000000..854c8c7
--- /dev/null
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterCommandListener.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.server;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupCommandListener;
+
+/** */
+public class CounterCommandListener implements RaftGroupCommandListener {
+    /** */
+    private AtomicInteger counter = new AtomicInteger();
+
+    /** {@inheritDoc} */
+    @Override public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+        while (iterator.hasNext()) {
+            CommandClosure<ReadCommand> clo = iterator.next();
+
+            assert clo.command() instanceof GetValueCommand;
+
+            clo.success(counter.get());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+        while (iterator.hasNext()) {
+            CommandClosure<WriteCommand> clo = iterator.next();
+
+            IncrementAndGetCommand cmd0 = (IncrementAndGetCommand) clo.command();
+
+            clo.success(counter.addAndGet(cmd0.delta()));
+        }
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
similarity index 84%
copy from modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java
copy to modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
index 3d4cef6..96948a3 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.raft.client;
+package org.apache.ignite.raft.server;
 
-/**
- * A marker interface for replication group command.
- */
-public interface Command {
+import org.apache.ignite.raft.client.ReadCommand;
+
+/** */
+public class GetValueCommand implements ReadCommand {
 }
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
new file mode 100644
index 0000000..1378f0f
--- /dev/null
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.server;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.lang.LogWrapper;
+import org.apache.ignite.network.message.DefaultMessageMapperProvider;
+import org.apache.ignite.network.Network;
+import org.apache.ignite.network.NetworkCluster;
+import org.apache.ignite.network.scalecube.ScaleCubeMemberResolver;
+import org.apache.ignite.network.scalecube.ScaleCubeNetworkClusterFactory;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
+import org.apache.ignite.raft.client.message.impl.RaftClientMessageFactoryImpl;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.server.impl.RaftServerImpl;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** */
+class ITRaftCounterServerTest {
+    /** */
+    private static LogWrapper LOG = new LogWrapper(ITRaftCounterServerTest.class);
+
+    /** */
+    private static RaftClientMessageFactory FACTORY = new RaftClientMessageFactoryImpl();
+
+    /** */
+    private RaftServer server;
+
+    /** */
+    private static final String SERVER_ID = "testServer";
+
+    /** */
+    private static final String CLIENT_ID = "testClient";
+
+    /** */
+    private static final String COUNTER_GROUP_ID_0 = "counter0";
+
+    /** */
+    private static final String COUNTER_GROUP_ID_1 = "counter1";
+
+    /**
+     * @param testInfo Test info.
+     */
+    @BeforeEach
+    void before(TestInfo testInfo) throws Exception {
+        LOG.info(">>>> Starting test " + testInfo.getTestMethod().orElseThrow().getName());
+
+        server = new RaftServerImpl(SERVER_ID,
+            20100,
+            FACTORY,
+            1000,
+            Map.of(COUNTER_GROUP_ID_0, new CounterCommandListener(), COUNTER_GROUP_ID_1, new CounterCommandListener()));
+    }
+
+    /**
+     * @throws Exception
+     */
+    @AfterEach
+    void after() throws Exception {
+        server.shutdown();
+    }
+
+    /**
+     * @throws Exception
+     */
+    @Test
+    public void testRefreshLeader() throws Exception {
+        NetworkCluster client = startClient(CLIENT_ID, 20101, List.of("localhost:20100"));
+
+        assertTrue(waitForTopology(client, 2, 1000));
+
+        Peer server = new Peer(client.allMembers().stream().filter(m -> SERVER_ID.equals(m.name())).findFirst().orElseThrow());
+
+        RaftGroupService service = new RaftGroupServiceImpl(COUNTER_GROUP_ID_0, client, FACTORY, 1000,
+            List.of(server), true, 200);
+
+        Peer leader = service.leader();
+
+        assertNotNull(leader);
+        assertEquals(server.getNode().name(), leader.getNode().name());
+
+        client.shutdown();
+    }
+
+    /**
+     * @throws Exception
+     */
+    @Test
+    public void testCounterCommandListener() throws Exception {
+        NetworkCluster client = startClient(CLIENT_ID, 20101, List.of("localhost:20100"));
+
+        assertTrue(waitForTopology(client, 2, 1000));
+
+        Peer server = new Peer(client.allMembers().stream().filter(m -> SERVER_ID.equals(m.name())).findFirst().orElseThrow());
+
+        RaftGroupService service0 = new RaftGroupServiceImpl(COUNTER_GROUP_ID_0, client, FACTORY, 1000,
+            List.of(server), true, 200);
+
+        RaftGroupService service1 = new RaftGroupServiceImpl(COUNTER_GROUP_ID_1, client, FACTORY, 1000,
+            List.of(server), true, 200);
+
+        assertNotNull(service0.leader());
+        assertNotNull(service1.leader());
+
+        assertEquals(2, service0.<Integer>run(new IncrementAndGetCommand(2)).get());
+        assertEquals(2, service0.<Integer>run(new GetValueCommand()).get());
+        assertEquals(3, service0.<Integer>run(new IncrementAndGetCommand(1)).get());
+        assertEquals(3, service0.<Integer>run(new GetValueCommand()).get());
+
+        assertEquals(4, service1.<Integer>run(new IncrementAndGetCommand(4)).get());
+        assertEquals(4, service1.<Integer>run(new GetValueCommand()).get());
+        assertEquals(7, service1.<Integer>run(new IncrementAndGetCommand(3)).get());
+        assertEquals(7, service1.<Integer>run(new GetValueCommand()).get());
+
+        client.shutdown();
+    }
+
+    /**
+     * @param name Node name.
+     * @param port Local port.
+     * @param servers Server nodes of the cluster.
+     * @return The client cluster view.
+     */
+    private NetworkCluster startClient(String name, int port, List<String> servers) {
+        Network network = new Network(
+            new ScaleCubeNetworkClusterFactory(name, port, servers, new ScaleCubeMemberResolver())
+        );
+
+        network.registerMessageMapper((short)1000, new DefaultMessageMapperProvider());
+        network.registerMessageMapper((short)1001, new DefaultMessageMapperProvider());
+        network.registerMessageMapper((short)1005, new DefaultMessageMapperProvider());
+        network.registerMessageMapper((short)1006, new DefaultMessageMapperProvider());
+        network.registerMessageMapper((short)1009, new DefaultMessageMapperProvider());
+
+        return network.start();
+    }
+
+    /**
+     * @param cluster The cluster.
+     * @param expected Expected count.
+     * @param timeout The timeout in millis.
+     * @return {@code True} if topology size is equal to expected.
+     */
+    private boolean waitForTopology(NetworkCluster cluster, int expected, int timeout) {
+        long stop = System.currentTimeMillis() + timeout;
+
+        while(System.currentTimeMillis() < stop) {
+            if (cluster.allMembers().size() >= expected)
+                return true;
+
+            try {
+                Thread.sleep(50);
+            }
+            catch (InterruptedException e) {
+                return false;
+            }
+        }
+
+        return false;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/IncrementAndGetCommand.java
similarity index 65%
copy from modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java
copy to modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/IncrementAndGetCommand.java
index 3d4cef6..81e1988 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/IncrementAndGetCommand.java
@@ -15,10 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.raft.client;
+package org.apache.ignite.raft.server;
 
-/**
- * A marker interface for replication group command.
- */
-public interface Command {
+import org.apache.ignite.raft.client.WriteCommand;
+
+/** */
+public class IncrementAndGetCommand implements WriteCommand {
+    /** */
+    private final int delta;
+
+    /**
+     * @param delta The delta.
+     */
+    public IncrementAndGetCommand(int delta) {
+        this.delta = delta;
+    }
+
+    /**
+     * @return The delta.
+     */
+    public int delta() {
+        return delta;
+    }
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/server/RaftServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/server/RaftServer.java
new file mode 100644
index 0000000..d6bcefd
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/server/RaftServer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.server;
+
+import org.apache.ignite.network.NetworkMember;
+import org.apache.ignite.raft.client.service.RaftGroupCommandListener;
+
+/**
+ * The RAFT protocol based replication server.
+ * <p>
+ * Supports multiple RAFT groups.
+ * <p>
+ * The server listens for client commands, submits them to a replicated log and calls {@link RaftGroupCommandListener}
+ * {@code onRead} and {@code onWrite} methods then after the command was committed to the log.
+ */
+public interface RaftServer {
+    /**
+     * @return Local member.
+     */
+    NetworkMember localMember();
+
+    /**
+     * Set a listener for group commands.
+     * @param groupId group id.
+     * @param lsnr Listener.
+     */
+    void setListener(String groupId, RaftGroupCommandListener lsnr);
+
+    /**
+     * Remove a command listener.
+     * @param groupId Group id.
+     */
+    void clearListener(String groupId);
+
+    /**
+     * Shutdown a server.
+     *
+     * @throws Exception
+     */
+    void shutdown() throws Exception;
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java
new file mode 100644
index 0000000..45c4283
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.server.impl;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.BiConsumer;
+import org.apache.ignite.lang.LogWrapper;
+import org.apache.ignite.network.message.DefaultMessageMapperProvider;
+import org.apache.ignite.network.Network;
+import org.apache.ignite.network.NetworkCluster;
+import org.apache.ignite.network.NetworkHandlersProvider;
+import org.apache.ignite.network.NetworkMember;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.network.scalecube.ScaleCubeMemberResolver;
+import org.apache.ignite.network.scalecube.ScaleCubeNetworkClusterFactory;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.RaftErrorCode;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.exception.RaftException;
+import org.apache.ignite.raft.client.message.ActionRequest;
+import org.apache.ignite.raft.client.message.GetLeaderRequest;
+import org.apache.ignite.raft.client.message.GetLeaderResponse;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
+import org.apache.ignite.raft.client.message.RaftErrorResponse;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupCommandListener;
+import org.apache.ignite.raft.server.RaftServer;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A single node server implementation.
+ */
+public class RaftServerImpl implements RaftServer {
+    /** */
+    private static LogWrapper LOG = new LogWrapper(RaftServerImpl.class);
+
+    /** */
+    private final String id;
+
+    /** */
+    private final int localPort;
+
+    /** */
+    private final RaftClientMessageFactory clientMsgFactory;
+
+    /** */
+    private final NetworkCluster server;
+
+    /** */
+    private final ConcurrentMap<String, RaftGroupCommandListener> listeners = new ConcurrentHashMap<>();
+
+    /** */
+    private final BlockingQueue<CommandClosureEx<ReadCommand>> readQueue;
+
+    /** */
+    private final BlockingQueue<CommandClosureEx<WriteCommand>> writeQueue;
+
+    /** */
+    private final Thread readWorker;
+
+    /** */
+    private final Thread writeWorker;
+
+    /**
+     * @param id Server id.
+     * @param localPort Local port.
+     * @param clientMsgFactory Client message factory.
+     * @param queueSize Queue size.
+     * @param listeners Command listeners.
+     */
+    public RaftServerImpl(
+        @NotNull String id,
+        int localPort,
+        @NotNull RaftClientMessageFactory clientMsgFactory,
+        int queueSize,
+        Map<String, RaftGroupCommandListener> listeners
+    ) {
+        Objects.requireNonNull(id);
+        Objects.requireNonNull(clientMsgFactory);
+
+        this.id = id;
+        this.localPort = localPort;
+        this.clientMsgFactory = clientMsgFactory;
+
+        if (listeners != null)
+            this.listeners.putAll(listeners);
+
+        readQueue = new ArrayBlockingQueue<CommandClosureEx<ReadCommand>>(queueSize);
+        writeQueue = new ArrayBlockingQueue<CommandClosureEx<WriteCommand>>(queueSize);
+
+        Network network = new Network(
+            new ScaleCubeNetworkClusterFactory(id, localPort, List.of(), new ScaleCubeMemberResolver())
+        );
+
+        network.registerMessageMapper((short)1000, new DefaultMessageMapperProvider());
+        network.registerMessageMapper((short)1001, new DefaultMessageMapperProvider());
+        network.registerMessageMapper((short)1005, new DefaultMessageMapperProvider());
+        network.registerMessageMapper((short)1006, new DefaultMessageMapperProvider());
+        network.registerMessageMapper((short)1009, new DefaultMessageMapperProvider());
+
+        server = network.start();
+
+        server.addHandlersProvider(new NetworkHandlersProvider() {
+            @Override public NetworkMessageHandler messageHandler() {
+                return new NetworkMessageHandler() {
+                    @Override public void onReceived(NetworkMessage req, NetworkMember sender, String corellationId) {
+
+                        if (req instanceof GetLeaderRequest) {
+                            GetLeaderResponse resp = clientMsgFactory.getLeaderResponse().leader(new Peer(server.localMember())).build();
+
+                            server.send(sender, resp, corellationId);
+                        }
+                        else if (req instanceof ActionRequest) {
+                            ActionRequest req0 = (ActionRequest) req;
+
+                            RaftGroupCommandListener lsnr = listeners.get(req0.groupId());
+
+                            if (lsnr == null) {
+                                sendError(sender, corellationId, RaftErrorCode.ILLEGAL_STATE);
+
+                                return;
+                            }
+
+                            if (req0.command() instanceof ReadCommand) {
+                                handleActionRequest(sender, req0, corellationId, readQueue, lsnr);
+                            }
+                            else {
+                                handleActionRequest(sender, req0, corellationId, writeQueue, lsnr);
+                            }
+                        }
+                        else {
+                            LOG.warn("Unsupported message class " + req.getClass().getName());
+                        }
+                    }
+                };
+            }
+        });
+
+        readWorker = new Thread(() -> processQueue(readQueue, (l, i) -> l.onRead(i)), "read-cmd-worker#" + id);
+        readWorker.setDaemon(true);
+        readWorker.start();
+
+        writeWorker = new Thread(() -> processQueue(writeQueue, (l, i) -> l.onWrite(i)), "write-cmd-worker#" + id);
+        writeWorker.setDaemon(true);
+        writeWorker.start();
+
+        LOG.info("Started replication server [id=" + id + ", localPort=" + localPort + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public NetworkMember localMember() {
+        return server.localMember();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setListener(String groupId, RaftGroupCommandListener lsnr) {
+        listeners.put(groupId, lsnr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void clearListener(String groupId) {
+        listeners.remove(groupId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void shutdown() throws Exception {
+        server.shutdown();
+
+        readWorker.interrupt();
+        readWorker.join();
+
+        writeWorker.interrupt();
+        writeWorker.join();
+
+        LOG.info("Stopped replication server [id=" + id + ", localPort=" + localPort + ']');
+    }
+
+    private <T extends Command> void handleActionRequest(
+        NetworkMember sender,
+        ActionRequest req,
+        String corellationId,
+        BlockingQueue<CommandClosureEx<T>> queue,
+        RaftGroupCommandListener lsnr
+    ) {
+        if (!queue.offer(new CommandClosureEx<T>() {
+            @Override public RaftGroupCommandListener listener() {
+                return lsnr;
+            }
+
+            @Override public T command() {
+                return (T) req.command();
+            }
+
+            @Override public void success(Object res) {
+                server.send(sender, clientMsgFactory.actionResponse().result(res).build(), corellationId);
+            }
+
+            @Override public void failure(Throwable t) {
+                sendError(sender, corellationId, RaftErrorCode.ILLEGAL_STATE);
+            }
+        })) {
+            // Queue out of capacity.
+            sendError(sender, corellationId, RaftErrorCode.BUSY);
+
+            return;
+        }
+    }
+
+    private <T extends Command> void processQueue(
+        BlockingQueue<CommandClosureEx<T>> queue,
+        BiConsumer<RaftGroupCommandListener, Iterator<CommandClosure<T>>> clo
+    ) {
+        while (!Thread.interrupted()) {
+            try {
+                CommandClosureEx<T> cmdClo = queue.take();
+
+                RaftGroupCommandListener lsnr = cmdClo.listener();
+
+                if (lsnr == null)
+                    cmdClo.failure(new RaftException(RaftErrorCode.ILLEGAL_STATE));
+                else
+                    clo.accept(lsnr, List.<CommandClosure<T>>of(cmdClo).iterator());
+            }
+            catch (InterruptedException e) {
+                return;
+            }
+        }
+    }
+
+    private void sendError(NetworkMember sender, String corellationId, RaftErrorCode errorCode) {
+        RaftErrorResponse resp = clientMsgFactory.raftErrorResponse().errorCode(errorCode).build();
+
+        server.send(sender, resp, corellationId);
+    }
+
+    private interface CommandClosureEx<T extends Command> extends CommandClosure<T> {
+        RaftGroupCommandListener listener();
+    }
+}
diff --git a/pom.xml b/pom.xml
index a81b672..9fd5b78 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,6 +44,7 @@
         <module>modules/metastorage-client</module>
         <module>modules/metastorage-common</module>
         <module>modules/network</module>
+        <module>modules/raft</module>
         <module>modules/raft-client</module>
         <module>modules/rest</module>
         <module>modules/runner</module>