You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/04/02 16:00:00 UTC
[ignite-3] branch main updated: IGNITE-14460 Single-node RAFT
server (#80).
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ed2b820 IGNITE-14460 Single-node RAFT server (#80).
ed2b820 is described below
commit ed2b820a89478b3cda2b2e73181a1b44e32094c8
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Fri Apr 2 18:57:47 2021 +0300
IGNITE-14460 Single-node RAFT server (#80).
---
.../ITScaleCubeNetworkClusterMessagingTest.java | 2 +-
.../scalecube/TestNetworkHandlersProvider.java | 2 +-
.../org/apache/ignite/network/NetworkCluster.java | 13 +-
.../org/apache/ignite/network/NetworkMember.java | 3 +-
.../ignite/network/NetworkMessageHandler.java | 4 +-
.../message/DefaultMessageMapperProvider.java | 54 +++++
.../network/scalecube/ScaleCubeMessageHandler.java | 3 +-
.../network/scalecube/ScaleCubeNetworkCluster.java | 16 +-
.../org/apache/ignite/raft/client/Command.java | 4 +-
.../java/org/apache/ignite/raft/client/Peer.java | 3 +-
.../apache/ignite/raft/client/RaftErrorCode.java | 10 +-
.../ignite/raft/client/message/ActionRequest.java | 3 +-
.../ignite/raft/client/message/ActionResponse.java | 3 +-
.../raft/client/message/AddLearnersRequest.java | 3 +-
.../raft/client/message/AddPeersRequest.java | 3 +-
.../raft/client/message/ChangePeersResponse.java | 3 +-
.../raft/client/message/GetLeaderRequest.java | 3 +-
.../raft/client/message/GetLeaderResponse.java | 3 +-
.../raft/client/message/GetPeersRequest.java | 3 +-
.../raft/client/message/GetPeersResponse.java | 3 +-
.../{impl => }/RaftClientMessageFactory.java | 17 +-
.../raft/client/message/RaftErrorResponse.java | 3 +-
.../raft/client/message/RemoveLearnersRequest.java | 3 +-
.../raft/client/message/RemovePeersRequest.java | 3 +-
.../raft/client/message/SnapshotRequest.java | 3 +-
.../client/message/TransferLeadershipRequest.java | 3 +-
.../message/impl/RaftClientMessageFactoryImpl.java | 1 +
...oupCommandListener.java => CommandClosure.java} | 26 +-
.../client/service/RaftGroupCommandListener.java | 4 +-
.../client/service/impl/RaftGroupServiceImpl.java | 10 +-
.../raft/client/service/RaftGroupServiceTest.java | 6 +-
modules/raft/pom.xml | 69 ++++++
.../ignite/raft/server/CounterCommandListener.java | 53 +++++
.../ignite/raft/server/GetValueCommand.java} | 10 +-
.../raft/server/ITRaftCounterServerTest.java | 186 +++++++++++++++
.../raft/server/IncrementAndGetCommand.java} | 26 +-
.../org/apache/ignite/raft/server/RaftServer.java | 56 +++++
.../ignite/raft/server/impl/RaftServerImpl.java | 264 +++++++++++++++++++++
pom.xml | 1 +
39 files changed, 812 insertions(+), 73 deletions(-)
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java
index f8c03ec..792bd6f 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java
@@ -76,7 +76,7 @@ class ITScaleCubeNetworkClusterMessagingTest {
final NetworkHandlersProvider messageWaiter = new NetworkHandlersProvider() {
/** {@inheritDoc} */
@Override public NetworkMessageHandler messageHandler() {
- return message -> {
+ return (message, sender, corellationId) -> {
latch.countDown();
};
}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java
index ef77ec9..6ef82d8 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java
@@ -41,7 +41,7 @@ class TestNetworkHandlersProvider implements NetworkHandlersProvider {
/** {@inheritDoc} */
@Override public NetworkMessageHandler messageHandler() {
- return event -> {
+ return (event, sender, corellationId) -> {
MESSAGE_STORAGE.put(localName, event);
System.out.println(localName + " handled messages : " + event);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
index 1b702be..a03f084 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
@@ -64,6 +64,17 @@ public interface NetworkCluster {
Future<Void> send(NetworkMember member, NetworkMessage msg);
/**
+ * Try to send the message asynchronously to the specific member with next guarantees:
+ * * Messages which was sent from one thread to one member will be delivered in the same order as they were sent.
+ * * If message N was successfully delivered to the member that means all messages preceding N also were successfully delivered.
+ *
+ * @param member Network member which should receive the message.
+ * @param msg Message which should be delivered.
+ * @param corellationId Corellation id when replying to the request.
+ */
+ Future<?> send(NetworkMember member, NetworkMessage msg, String corellationId);
+
+ /**
* Sends a message asynchronously with same guarantees as for {@link #send(NetworkMember, NetworkMessage)} and
* returns a response.
*
@@ -73,7 +84,7 @@ public interface NetworkCluster {
* @param <R> Expected response type.
* @return A future holding the response or error if the expected response was not received.
*/
- CompletableFuture<NetworkMessage> sendWithResponse(NetworkMember member, NetworkMessage msg, long timeout);
+ CompletableFuture<NetworkMessage> invoke(NetworkMember member, NetworkMessage msg, long timeout);
/**
* Add provider which allows to get configured handlers for different cluster events(ex. received message).
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkMember.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkMember.java
index 9c5e2bc..41ede74 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkMember.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkMember.java
@@ -16,12 +16,13 @@
*/
package org.apache.ignite.network;
+import java.io.Serializable;
import java.util.Objects;
/**
* Representation of the network member.
*/
-public class NetworkMember {
+public class NetworkMember implements Serializable {
/** Unique name of member in cluster. */
private final String name;
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
index 130af86..9e4e784 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
@@ -24,6 +24,8 @@ import org.apache.ignite.network.message.NetworkMessage;
public interface NetworkMessageHandler {
/**
* @param message Message which was received from cluster.
+ * @param sender Sender.
+ * @param corellationId Corellation id.
*/
- void onReceived(NetworkMessage message);
+ void onReceived(NetworkMessage message, NetworkMember sender, String corellationId);
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/message/DefaultMessageMapperProvider.java b/modules/network/src/main/java/org/apache/ignite/network/message/DefaultMessageMapperProvider.java
new file mode 100644
index 0000000..32e8110
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/message/DefaultMessageMapperProvider.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.message;
+
+import java.io.IOException;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageMapperProvider;
+import org.apache.ignite.network.message.MessageMappingException;
+import org.apache.ignite.network.message.MessageSerializer;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Uses JDK serialization.
+ */
+public class DefaultMessageMapperProvider implements MessageMapperProvider<NetworkMessage> {
+ /** {@inheritDoc} */
+ @Override public MessageDeserializer<NetworkMessage> createDeserializer() {
+ return reader -> {
+ try {
+ return (NetworkMessage)reader.stream().readObject();
+ }
+ catch (Exception e) {
+ throw new MessageMappingException("Failed to deserialize", e);
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessageSerializer<NetworkMessage> createSerializer() {
+ return (message, writer) -> {
+ try {
+ writer.stream().writeObject(message);
+ }
+ catch (IOException e) {
+ throw new MessageMappingException("Failed to serialize", e);
+ }
+ };
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java
index 4c32e4b..2e66249 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java
@@ -65,7 +65,8 @@ public class ScaleCubeMessageHandler implements ClusterMessageHandler {
@Override public void onMessage(Message message) {
for (NetworkMessageHandler handler : messageHandlerHolder.messageHandlers()) {
NetworkMessage msg = message.data();
- handler.onReceived(msg);
+ NetworkMember sender = memberForAddress(message.sender());
+ handler.onReceived(msg, sender, message.correlationId());
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
index e83e798..f176dc2 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
@@ -19,6 +19,7 @@ package org.apache.ignite.network.scalecube;
import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.transport.api.Message;
import java.util.Collection;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@@ -30,6 +31,7 @@ import org.apache.ignite.network.NetworkMember;
import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.network.NetworkMessageHandler;
+import static io.scalecube.cluster.transport.api.Message.fromData;
import static java.time.Duration.ofMillis;
import static org.apache.ignite.network.scalecube.ScaleCubeMessageCodec.HEADER_MESSAGE_TYPE;
@@ -88,12 +90,20 @@ public class ScaleCubeNetworkCluster implements NetworkCluster {
/** {@inheritDoc} */
@Override public Future<Void> send(NetworkMember member, NetworkMessage msg) {
- return cluster.send(memberResolver.resolveMember(member), fromNetworkMessage(msg)).toFuture();
+ return cluster.send(memberResolver.resolveMember(member), fromData(msg)).toFuture();
+ }
+
+ @Override public Future<?> send(NetworkMember member, NetworkMessage msg, String corellationId) {
+ return cluster.send(memberResolver.resolveMember(member),
+ Message.withData(msg).header(HEADER_MESSAGE_TYPE, String.valueOf(msg.directType())).
+ correlationId(corellationId).build()).toFuture();
}
/** {@inheritDoc} */
- @Override public CompletableFuture<NetworkMessage> sendWithResponse(NetworkMember member, NetworkMessage msg, long timeout) {
- return cluster.requestResponse(memberResolver.resolveMember(member), fromNetworkMessage(msg))
+ @Override public CompletableFuture<NetworkMessage> invoke(NetworkMember member, NetworkMessage msg, long timeout) {
+ return cluster.requestResponse(memberResolver.resolveMember(member),
+ Message.withData(msg).correlationId(UUID.randomUUID().toString()).
+ header(HEADER_MESSAGE_TYPE, String.valueOf(msg.directType())).build())
.timeout(ofMillis(timeout)).toFuture().thenApply(m -> m.data());
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java
index 3d4cef6..8e2e650 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java
@@ -17,8 +17,10 @@
package org.apache.ignite.raft.client;
+import java.io.Serializable;
+
/**
* A marker interface for replication group command.
*/
-public interface Command {
+public interface Command extends Serializable {
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java
index d8b94d3..33ecc25 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java
@@ -17,12 +17,13 @@
package org.apache.ignite.raft.client;
+import java.io.Serializable;
import org.apache.ignite.network.NetworkMember;
/**
* A participant of a replication group.
*/
-public final class Peer {
+public final class Peer implements Serializable {
/**
* Network node.
*/
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftErrorCode.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftErrorCode.java
index 0346ed7..3784d1b 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftErrorCode.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftErrorCode.java
@@ -25,10 +25,16 @@ public enum RaftErrorCode {
SUCCESS(1000, "Successful"),
/** */
- NO_LEADER(1001, "No leader found within a timeout"),
+ NO_LEADER(1001, "No leader is found within a timeout"),
/** */
- LEADER_CHANGED(1002, "A peer is no longer a leader");
+ LEADER_CHANGED(1002, "A peer is no longer a leader"),
+
+ /** */
+ ILLEGAL_STATE(1003, "A peer is in illegal state"),
+
+ /** */
+ BUSY(1004, "A peer is busy, retry later");
/** */
private final int code;
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionRequest.java
index df2c103..ff1c140 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionRequest.java
@@ -17,13 +17,14 @@
package org.apache.ignite.raft.client.message;
+import java.io.Serializable;
import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.raft.client.Command;
/**
* Submit an action to a replication group.
*/
-public interface ActionRequest<T> extends NetworkMessage {
+public interface ActionRequest<T> extends NetworkMessage, Serializable {
/**
* @return Group id.
*/
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionResponse.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionResponse.java
index 8b187b8..40986c1 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionResponse.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionResponse.java
@@ -17,12 +17,13 @@
package org.apache.ignite.raft.client.message;
+import java.io.Serializable;
import org.apache.ignite.network.message.NetworkMessage;
/**
* The result of an action.
*/
-public interface ActionResponse<T> extends NetworkMessage {
+public interface ActionResponse<T> extends NetworkMessage, Serializable {
/**
* @return A result for this request, can be of any type.
*/
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequest.java
index 91adb59..3e79531 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.client.message;
+import java.io.Serializable;
import java.util.List;
import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.raft.client.Peer;
@@ -24,7 +25,7 @@ import org.apache.ignite.raft.client.Peer;
/**
* Add learners.
*/
-public interface AddLearnersRequest extends NetworkMessage {
+public interface AddLearnersRequest extends NetworkMessage, Serializable {
/**
* @return Group id.
*/
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeersRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeersRequest.java
index 6851dc3..7883f02 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeersRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeersRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.client.message;
+import java.io.Serializable;
import java.util.List;
import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.raft.client.Peer;
@@ -24,7 +25,7 @@ import org.apache.ignite.raft.client.Peer;
/**
* Add peers.
*/
-public interface AddPeersRequest extends NetworkMessage {
+public interface AddPeersRequest extends NetworkMessage, Serializable {
/**
* @return Group id.
*/
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeersResponse.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeersResponse.java
index 3b63490..72f171b 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeersResponse.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeersResponse.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.client.message;
+import java.io.Serializable;
import java.util.List;
import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.raft.client.Peer;
@@ -24,7 +25,7 @@ import org.apache.ignite.raft.client.Peer;
/**
* Change peers result.
*/
-public interface ChangePeersResponse extends NetworkMessage {
+public interface ChangePeersResponse extends NetworkMessage, Serializable {
/**
* @return Old peers.
*/
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequest.java
index c552033..f4cb03b 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequest.java
@@ -17,12 +17,13 @@
package org.apache.ignite.raft.client.message;
+import java.io.Serializable;
import org.apache.ignite.network.message.NetworkMessage;
/**
* Get leader.
*/
-public interface GetLeaderRequest extends NetworkMessage {
+public interface GetLeaderRequest extends NetworkMessage, Serializable {
/**
* @return Group id.
*/
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderResponse.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderResponse.java
index a1b0e1e..c2a01ee 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderResponse.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderResponse.java
@@ -17,13 +17,14 @@
package org.apache.ignite.raft.client.message;
+import java.io.Serializable;
import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.raft.client.Peer;
/**
* A current leader.
*/
-public interface GetLeaderResponse extends NetworkMessage {
+public interface GetLeaderResponse extends NetworkMessage, Serializable {
/**
* @return The leader.
*/
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequest.java
index 65de645..e0d94f5 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequest.java
@@ -17,10 +17,11 @@
package org.apache.ignite.raft.client.message;
+import java.io.Serializable;
import org.apache.ignite.network.message.NetworkMessage;
/** Get peers. */
-public interface GetPeersRequest extends NetworkMessage {
+public interface GetPeersRequest extends NetworkMessage, Serializable {
/**
* @return Group id.
*/
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersResponse.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersResponse.java
index e8c4e9f..78157a6 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersResponse.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersResponse.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.client.message;
+import java.io.Serializable;
import java.util.List;
import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.raft.client.Peer;
@@ -24,7 +25,7 @@ import org.apache.ignite.raft.client.Peer;
/**
*
*/
-public interface GetPeersResponse extends NetworkMessage {
+public interface GetPeersResponse extends NetworkMessage, Serializable {
/**
* @return Current peers.
*/
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/impl/RaftClientMessageFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactory.java
similarity index 70%
rename from modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/impl/RaftClientMessageFactory.java
rename to modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactory.java
index 7512dec..9fe6a07 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/impl/RaftClientMessageFactory.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactory.java
@@ -15,22 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.raft.client.message.impl;
-
-import org.apache.ignite.raft.client.message.AddLearnersRequest;
-import org.apache.ignite.raft.client.message.AddPeersRequest;
-import org.apache.ignite.raft.client.message.ChangePeersResponse;
-import org.apache.ignite.raft.client.message.GetLeaderRequest;
-import org.apache.ignite.raft.client.message.GetLeaderResponse;
-import org.apache.ignite.raft.client.message.GetPeersRequest;
-import org.apache.ignite.raft.client.message.GetPeersResponse;
-import org.apache.ignite.raft.client.message.RaftErrorResponse;
-import org.apache.ignite.raft.client.message.RemoveLearnersRequest;
-import org.apache.ignite.raft.client.message.RemovePeersRequest;
-import org.apache.ignite.raft.client.message.SnapshotRequest;
-import org.apache.ignite.raft.client.message.TransferLeadershipRequest;
-import org.apache.ignite.raft.client.message.ActionRequest;
-import org.apache.ignite.raft.client.message.ActionResponse;
+package org.apache.ignite.raft.client.message;
/**
* A factory for immutable replication group messages.
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftErrorResponse.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftErrorResponse.java
index 1f90b9b..3b92c63 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftErrorResponse.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftErrorResponse.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.client.message;
+import java.io.Serializable;
import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.RaftErrorCode;
@@ -25,7 +26,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Raft error response. Also used as a default response when errorCode == {@link RaftErrorCode#SUCCESS}
*/
-public interface RaftErrorResponse extends NetworkMessage {
+public interface RaftErrorResponse extends NetworkMessage, Serializable {
/**
* @return Error code.
*/
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequest.java
index 99d235d..2ba2d9c 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.client.message;
+import java.io.Serializable;
import java.util.List;
import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.raft.client.Peer;
@@ -24,7 +25,7 @@ import org.apache.ignite.raft.client.Peer;
/**
* Remove learners.
*/
-public interface RemoveLearnersRequest extends NetworkMessage {
+public interface RemoveLearnersRequest extends NetworkMessage, Serializable {
/**
* @return Group id.
*/
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeersRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeersRequest.java
index 93e5f2f..488e226 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeersRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeersRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.client.message;
+import java.io.Serializable;
import java.util.List;
import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.raft.client.Peer;
@@ -24,7 +25,7 @@ import org.apache.ignite.raft.client.Peer;
/**
* Remove peers.
*/
-public interface RemovePeersRequest extends NetworkMessage {
+public interface RemovePeersRequest extends NetworkMessage, Serializable {
/**
* @return Group id.
*/
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequest.java
index 4767aec..df607ec 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequest.java
@@ -17,12 +17,13 @@
package org.apache.ignite.raft.client.message;
+import java.io.Serializable;
import org.apache.ignite.network.message.NetworkMessage;
/**
* Take a local snapshot on the peer.
*/
-public interface SnapshotRequest extends NetworkMessage {
+public interface SnapshotRequest extends NetworkMessage, Serializable {
/**
* @return Group id.
*/
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeadershipRequest.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeadershipRequest.java
index f5cf876..e668f7c 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeadershipRequest.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeadershipRequest.java
@@ -17,13 +17,14 @@
package org.apache.ignite.raft.client.message;
+import java.io.Serializable;
import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.raft.client.Peer;
/**
* Transfer a leadership to receiving peer.
*/
-public interface TransferLeadershipRequest extends NetworkMessage {
+public interface TransferLeadershipRequest extends NetworkMessage, Serializable {
/**
* @return Group id.
*/
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/impl/RaftClientMessageFactoryImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/impl/RaftClientMessageFactoryImpl.java
index ce52677..e0d2cd5 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/impl/RaftClientMessageFactoryImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/impl/RaftClientMessageFactoryImpl.java
@@ -24,6 +24,7 @@ import org.apache.ignite.raft.client.message.GetLeaderRequest;
import org.apache.ignite.raft.client.message.GetLeaderResponse;
import org.apache.ignite.raft.client.message.GetPeersRequest;
import org.apache.ignite.raft.client.message.GetPeersResponse;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
import org.apache.ignite.raft.client.message.RaftErrorResponse;
import org.apache.ignite.raft.client.message.RemoveLearnersRequest;
import org.apache.ignite.raft.client.message.RemovePeersRequest;
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupCommandListener.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/CommandClosure.java
similarity index 64%
copy from modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupCommandListener.java
copy to modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/CommandClosure.java
index d22c334..3adb047 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupCommandListener.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/CommandClosure.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,21 +17,27 @@
package org.apache.ignite.raft.client.service;
-import java.util.Iterator;
-import org.apache.ignite.raft.client.ReadCommand;
-import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.Command;
/**
- * A listener for replication group commands.
+ * A closure to notify abbout command processing outcome.
+ * @param <R> Command type.
*/
-public interface RaftGroupCommandListener {
+public interface CommandClosure<R extends Command> {
/**
- * @param iterator Read command iterator.
+ * @return The command.
*/
- void onRead(Iterator<ReadCommand> iterator);
+ R command();
/**
- * @param iterator Write command iterator.
+ * Success outcome.
+ * @param res The result.
*/
- void onWrite(Iterator<WriteCommand> iterator);
+ void success(Object res);
+
+ /**
+ * Failure outcome.
+ * @param err The error.
+ */
+ void failure(Throwable err);
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupCommandListener.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupCommandListener.java
index d22c334..6dd14be 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupCommandListener.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupCommandListener.java
@@ -28,10 +28,10 @@ public interface RaftGroupCommandListener {
/**
* @param iterator Read command iterator.
*/
- void onRead(Iterator<ReadCommand> iterator);
+ void onRead(Iterator<CommandClosure<ReadCommand>> iterator);
/**
* @param iterator Write command iterator.
*/
- void onWrite(Iterator<WriteCommand> iterator);
+ void onWrite(Iterator<CommandClosure<WriteCommand>> iterator);
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
index 4662a0c..825a8c6 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
@@ -46,7 +46,7 @@ import org.apache.ignite.raft.client.message.RemoveLearnersRequest;
import org.apache.ignite.raft.client.message.RemovePeersRequest;
import org.apache.ignite.raft.client.message.SnapshotRequest;
import org.apache.ignite.raft.client.message.TransferLeadershipRequest;
-import org.apache.ignite.raft.client.message.impl.RaftClientMessageFactory;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.jetbrains.annotations.NotNull;
@@ -262,7 +262,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
@Override public CompletableFuture<Void> snapshot(Peer peer) {
SnapshotRequest req = factory.snapshotRequest().groupId(groupId).build();
- CompletableFuture<?> fut = cluster.sendWithResponse(peer.getNode(), req, timeout);
+ CompletableFuture<?> fut = cluster.invoke(peer.getNode(), req, timeout);
return fut.thenApply(resp -> null);
}
@@ -276,7 +276,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
TransferLeadershipRequest req = factory.transferLeaderRequest().groupId(groupId).peer(newLeader).build();
- CompletableFuture<?> fut = cluster.sendWithResponse(newLeader.getNode(), req, timeout);
+ CompletableFuture<?> fut = cluster.invoke(newLeader.getNode(), req, timeout);
return fut.thenApply(resp -> null);
}
@@ -299,7 +299,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
@Override public <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd) {
ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).build();
- CompletableFuture fut = cluster.sendWithResponse(peer.getNode(), req, timeout);
+ CompletableFuture fut = cluster.invoke(peer.getNode(), req, timeout);
return fut.thenApply(resp -> ((ActionResponse) resp).result());
}
@@ -307,7 +307,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
private <R> CompletableFuture<R> sendWithRetry(NetworkMember node, NetworkMessage req, long stopTime) {
if (currentTimeMillis() >= stopTime)
return CompletableFuture.failedFuture(new TimeoutException());
- return cluster.sendWithResponse(node, req, timeout)
+ return cluster.invoke(node, req, timeout)
.thenCompose(resp -> {
if (resp instanceof RaftErrorResponse) {
RaftErrorResponse resp0 = (RaftErrorResponse)resp;
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
index b8867ed..7512e24 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
@@ -30,7 +30,7 @@ import org.apache.ignite.raft.client.RaftErrorCode;
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.message.ActionRequest;
import org.apache.ignite.raft.client.message.GetLeaderRequest;
-import org.apache.ignite.raft.client.message.impl.RaftClientMessageFactory;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
import org.apache.ignite.raft.client.message.impl.RaftClientMessageFactoryImpl;
import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
import org.junit.jupiter.api.BeforeEach;
@@ -378,7 +378,7 @@ public class RaftGroupServiceTest {
return completedFuture(resp);
}
- }).when(cluster).sendWithResponse(any(), argThat(new ArgumentMatcher<ActionRequest>() {
+ }).when(cluster).invoke(any(), argThat(new ArgumentMatcher<ActionRequest>() {
@Override public boolean matches(ActionRequest arg) {
return arg.command() instanceof TestCommand;
}
@@ -408,7 +408,7 @@ public class RaftGroupServiceTest {
return completedFuture(resp);
}
- }).when(cluster).sendWithResponse(any(), any(GetLeaderRequest.class), anyLong());
+ }).when(cluster).invoke(any(), any(GetLeaderRequest.class), anyLong());
}
/** */
diff --git a/modules/raft/pom.xml b/modules/raft/pom.xml
new file mode 100644
index 0000000..483446f
--- /dev/null
+++ b/modules/raft/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>ignite-raft</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+
+ <dependencies>
+ <!-- logging -->
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-raft-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterCommandListener.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterCommandListener.java
new file mode 100644
index 0000000..854c8c7
--- /dev/null
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterCommandListener.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.server;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupCommandListener;
+
+/** */
+public class CounterCommandListener implements RaftGroupCommandListener {
+ /** */
+ private AtomicInteger counter = new AtomicInteger();
+
+ /** {@inheritDoc} */
+ @Override public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+ while (iterator.hasNext()) {
+ CommandClosure<ReadCommand> clo = iterator.next();
+
+ assert clo.command() instanceof GetValueCommand;
+
+ clo.success(counter.get());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+ while (iterator.hasNext()) {
+ CommandClosure<WriteCommand> clo = iterator.next();
+
+ IncrementAndGetCommand cmd0 = (IncrementAndGetCommand) clo.command();
+
+ clo.success(counter.addAndGet(cmd0.delta()));
+ }
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
similarity index 84%
copy from modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java
copy to modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
index 3d4cef6..96948a3 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-package org.apache.ignite.raft.client;
+package org.apache.ignite.raft.server;
-/**
- * A marker interface for replication group command.
- */
-public interface Command {
+import org.apache.ignite.raft.client.ReadCommand;
+
+/** */
+public class GetValueCommand implements ReadCommand {
}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
new file mode 100644
index 0000000..1378f0f
--- /dev/null
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.server;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.lang.LogWrapper;
+import org.apache.ignite.network.message.DefaultMessageMapperProvider;
+import org.apache.ignite.network.Network;
+import org.apache.ignite.network.NetworkCluster;
+import org.apache.ignite.network.scalecube.ScaleCubeMemberResolver;
+import org.apache.ignite.network.scalecube.ScaleCubeNetworkClusterFactory;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
+import org.apache.ignite.raft.client.message.impl.RaftClientMessageFactoryImpl;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.server.impl.RaftServerImpl;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** */
+class ITRaftCounterServerTest {
+ /** */
+ private static LogWrapper LOG = new LogWrapper(ITRaftCounterServerTest.class);
+
+ /** */
+ private static RaftClientMessageFactory FACTORY = new RaftClientMessageFactoryImpl();
+
+ /** */
+ private RaftServer server;
+
+ /** */
+ private static final String SERVER_ID = "testServer";
+
+ /** */
+ private static final String CLIENT_ID = "testClient";
+
+ /** */
+ private static final String COUNTER_GROUP_ID_0 = "counter0";
+
+ /** */
+ private static final String COUNTER_GROUP_ID_1 = "counter1";
+
+ /**
+ * @param testInfo Test info.
+ */
+ @BeforeEach
+ void before(TestInfo testInfo) throws Exception {
+ LOG.info(">>>> Starting test " + testInfo.getTestMethod().orElseThrow().getName());
+
+ server = new RaftServerImpl(SERVER_ID,
+ 20100,
+ FACTORY,
+ 1000,
+ Map.of(COUNTER_GROUP_ID_0, new CounterCommandListener(), COUNTER_GROUP_ID_1, new CounterCommandListener()));
+ }
+
+ /**
+ * @throws Exception
+ */
+ @AfterEach
+ void after() throws Exception {
+ server.shutdown();
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testRefreshLeader() throws Exception {
+ NetworkCluster client = startClient(CLIENT_ID, 20101, List.of("localhost:20100"));
+
+ assertTrue(waitForTopology(client, 2, 1000));
+
+ Peer server = new Peer(client.allMembers().stream().filter(m -> SERVER_ID.equals(m.name())).findFirst().orElseThrow());
+
+ RaftGroupService service = new RaftGroupServiceImpl(COUNTER_GROUP_ID_0, client, FACTORY, 1000,
+ List.of(server), true, 200);
+
+ Peer leader = service.leader();
+
+ assertNotNull(leader);
+ assertEquals(server.getNode().name(), leader.getNode().name());
+
+ client.shutdown();
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testCounterCommandListener() throws Exception {
+ NetworkCluster client = startClient(CLIENT_ID, 20101, List.of("localhost:20100"));
+
+ assertTrue(waitForTopology(client, 2, 1000));
+
+ Peer server = new Peer(client.allMembers().stream().filter(m -> SERVER_ID.equals(m.name())).findFirst().orElseThrow());
+
+ RaftGroupService service0 = new RaftGroupServiceImpl(COUNTER_GROUP_ID_0, client, FACTORY, 1000,
+ List.of(server), true, 200);
+
+ RaftGroupService service1 = new RaftGroupServiceImpl(COUNTER_GROUP_ID_1, client, FACTORY, 1000,
+ List.of(server), true, 200);
+
+ assertNotNull(service0.leader());
+ assertNotNull(service1.leader());
+
+ assertEquals(2, service0.<Integer>run(new IncrementAndGetCommand(2)).get());
+ assertEquals(2, service0.<Integer>run(new GetValueCommand()).get());
+ assertEquals(3, service0.<Integer>run(new IncrementAndGetCommand(1)).get());
+ assertEquals(3, service0.<Integer>run(new GetValueCommand()).get());
+
+ assertEquals(4, service1.<Integer>run(new IncrementAndGetCommand(4)).get());
+ assertEquals(4, service1.<Integer>run(new GetValueCommand()).get());
+ assertEquals(7, service1.<Integer>run(new IncrementAndGetCommand(3)).get());
+ assertEquals(7, service1.<Integer>run(new GetValueCommand()).get());
+
+ client.shutdown();
+ }
+
+ /**
+ * @param name Node name.
+ * @param port Local port.
+ * @param servers Server nodes of the cluster.
+ * @return The client cluster view.
+ */
+ private NetworkCluster startClient(String name, int port, List<String> servers) {
+ Network network = new Network(
+ new ScaleCubeNetworkClusterFactory(name, port, servers, new ScaleCubeMemberResolver())
+ );
+
+ network.registerMessageMapper((short)1000, new DefaultMessageMapperProvider());
+ network.registerMessageMapper((short)1001, new DefaultMessageMapperProvider());
+ network.registerMessageMapper((short)1005, new DefaultMessageMapperProvider());
+ network.registerMessageMapper((short)1006, new DefaultMessageMapperProvider());
+ network.registerMessageMapper((short)1009, new DefaultMessageMapperProvider());
+
+ return network.start();
+ }
+
+ /**
+ * @param cluster The cluster.
+ * @param expected Expected count.
+ * @param timeout The timeout in millis.
+ * @return {@code True} if topology size is equal to expected.
+ */
+ private boolean waitForTopology(NetworkCluster cluster, int expected, int timeout) {
+ long stop = System.currentTimeMillis() + timeout;
+
+ while(System.currentTimeMillis() < stop) {
+ if (cluster.allMembers().size() >= expected)
+ return true;
+
+ try {
+ Thread.sleep(50);
+ }
+ catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ return false;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/IncrementAndGetCommand.java
similarity index 65%
copy from modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java
copy to modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/IncrementAndGetCommand.java
index 3d4cef6..81e1988 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Command.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/IncrementAndGetCommand.java
@@ -15,10 +15,26 @@
* limitations under the License.
*/
-package org.apache.ignite.raft.client;
+package org.apache.ignite.raft.server;
-/**
- * A marker interface for replication group command.
- */
-public interface Command {
+import org.apache.ignite.raft.client.WriteCommand;
+
+/** */
+public class IncrementAndGetCommand implements WriteCommand {
+ /** */
+ private final int delta;
+
+ /**
+ * @param delta The delta.
+ */
+ public IncrementAndGetCommand(int delta) {
+ this.delta = delta;
+ }
+
+ /**
+ * @return The delta.
+ */
+ public int delta() {
+ return delta;
+ }
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/server/RaftServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/server/RaftServer.java
new file mode 100644
index 0000000..d6bcefd
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/server/RaftServer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.server;
+
+import org.apache.ignite.network.NetworkMember;
+import org.apache.ignite.raft.client.service.RaftGroupCommandListener;
+
+/**
+ * The RAFT protocol based replication server.
+ * <p>
+ * Supports multiple RAFT groups.
+ * <p>
+ * The server listens for client commands, submits them to a replicated log and calls {@link RaftGroupCommandListener}
+ * {@code onRead} and {@code onWrite} methods then after the command was committed to the log.
+ */
+public interface RaftServer {
+ /**
+ * @return Local member.
+ */
+ NetworkMember localMember();
+
+ /**
+ * Set a listener for group commands.
+ * @param groupId group id.
+ * @param lsnr Listener.
+ */
+ void setListener(String groupId, RaftGroupCommandListener lsnr);
+
+ /**
+ * Remove a command listener.
+ * @param groupId Group id.
+ */
+ void clearListener(String groupId);
+
+ /**
+ * Shutdown a server.
+ *
+ * @throws Exception
+ */
+ void shutdown() throws Exception;
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java
new file mode 100644
index 0000000..45c4283
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.server.impl;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.BiConsumer;
+import org.apache.ignite.lang.LogWrapper;
+import org.apache.ignite.network.message.DefaultMessageMapperProvider;
+import org.apache.ignite.network.Network;
+import org.apache.ignite.network.NetworkCluster;
+import org.apache.ignite.network.NetworkHandlersProvider;
+import org.apache.ignite.network.NetworkMember;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.network.scalecube.ScaleCubeMemberResolver;
+import org.apache.ignite.network.scalecube.ScaleCubeNetworkClusterFactory;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.RaftErrorCode;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.exception.RaftException;
+import org.apache.ignite.raft.client.message.ActionRequest;
+import org.apache.ignite.raft.client.message.GetLeaderRequest;
+import org.apache.ignite.raft.client.message.GetLeaderResponse;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
+import org.apache.ignite.raft.client.message.RaftErrorResponse;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupCommandListener;
+import org.apache.ignite.raft.server.RaftServer;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A single node server implementation.
+ */
+public class RaftServerImpl implements RaftServer {
+ /** */
+ private static LogWrapper LOG = new LogWrapper(RaftServerImpl.class);
+
+ /** */
+ private final String id;
+
+ /** */
+ private final int localPort;
+
+ /** */
+ private final RaftClientMessageFactory clientMsgFactory;
+
+ /** */
+ private final NetworkCluster server;
+
+ /** */
+ private final ConcurrentMap<String, RaftGroupCommandListener> listeners = new ConcurrentHashMap<>();
+
+ /** */
+ private final BlockingQueue<CommandClosureEx<ReadCommand>> readQueue;
+
+ /** */
+ private final BlockingQueue<CommandClosureEx<WriteCommand>> writeQueue;
+
+ /** */
+ private final Thread readWorker;
+
+ /** */
+ private final Thread writeWorker;
+
+ /**
+ * @param id Server id.
+ * @param localPort Local port.
+ * @param clientMsgFactory Client message factory.
+ * @param queueSize Queue size.
+ * @param listeners Command listeners.
+ */
+ public RaftServerImpl(
+ @NotNull String id,
+ int localPort,
+ @NotNull RaftClientMessageFactory clientMsgFactory,
+ int queueSize,
+ Map<String, RaftGroupCommandListener> listeners
+ ) {
+ Objects.requireNonNull(id);
+ Objects.requireNonNull(clientMsgFactory);
+
+ this.id = id;
+ this.localPort = localPort;
+ this.clientMsgFactory = clientMsgFactory;
+
+ if (listeners != null)
+ this.listeners.putAll(listeners);
+
+ readQueue = new ArrayBlockingQueue<CommandClosureEx<ReadCommand>>(queueSize);
+ writeQueue = new ArrayBlockingQueue<CommandClosureEx<WriteCommand>>(queueSize);
+
+ Network network = new Network(
+ new ScaleCubeNetworkClusterFactory(id, localPort, List.of(), new ScaleCubeMemberResolver())
+ );
+
+ network.registerMessageMapper((short)1000, new DefaultMessageMapperProvider());
+ network.registerMessageMapper((short)1001, new DefaultMessageMapperProvider());
+ network.registerMessageMapper((short)1005, new DefaultMessageMapperProvider());
+ network.registerMessageMapper((short)1006, new DefaultMessageMapperProvider());
+ network.registerMessageMapper((short)1009, new DefaultMessageMapperProvider());
+
+ server = network.start();
+
+ server.addHandlersProvider(new NetworkHandlersProvider() {
+ @Override public NetworkMessageHandler messageHandler() {
+ return new NetworkMessageHandler() {
+ @Override public void onReceived(NetworkMessage req, NetworkMember sender, String corellationId) {
+
+ if (req instanceof GetLeaderRequest) {
+ GetLeaderResponse resp = clientMsgFactory.getLeaderResponse().leader(new Peer(server.localMember())).build();
+
+ server.send(sender, resp, corellationId);
+ }
+ else if (req instanceof ActionRequest) {
+ ActionRequest req0 = (ActionRequest) req;
+
+ RaftGroupCommandListener lsnr = listeners.get(req0.groupId());
+
+ if (lsnr == null) {
+ sendError(sender, corellationId, RaftErrorCode.ILLEGAL_STATE);
+
+ return;
+ }
+
+ if (req0.command() instanceof ReadCommand) {
+ handleActionRequest(sender, req0, corellationId, readQueue, lsnr);
+ }
+ else {
+ handleActionRequest(sender, req0, corellationId, writeQueue, lsnr);
+ }
+ }
+ else {
+ LOG.warn("Unsupported message class " + req.getClass().getName());
+ }
+ }
+ };
+ }
+ });
+
+ readWorker = new Thread(() -> processQueue(readQueue, (l, i) -> l.onRead(i)), "read-cmd-worker#" + id);
+ readWorker.setDaemon(true);
+ readWorker.start();
+
+ writeWorker = new Thread(() -> processQueue(writeQueue, (l, i) -> l.onWrite(i)), "write-cmd-worker#" + id);
+ writeWorker.setDaemon(true);
+ writeWorker.start();
+
+ LOG.info("Started replication server [id=" + id + ", localPort=" + localPort + ']');
+ }
+
+ /** {@inheritDoc} */
+ @Override public NetworkMember localMember() {
+ return server.localMember();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setListener(String groupId, RaftGroupCommandListener lsnr) {
+ listeners.put(groupId, lsnr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clearListener(String groupId) {
+ listeners.remove(groupId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void shutdown() throws Exception {
+ server.shutdown();
+
+ readWorker.interrupt();
+ readWorker.join();
+
+ writeWorker.interrupt();
+ writeWorker.join();
+
+ LOG.info("Stopped replication server [id=" + id + ", localPort=" + localPort + ']');
+ }
+
+ private <T extends Command> void handleActionRequest(
+ NetworkMember sender,
+ ActionRequest req,
+ String corellationId,
+ BlockingQueue<CommandClosureEx<T>> queue,
+ RaftGroupCommandListener lsnr
+ ) {
+ if (!queue.offer(new CommandClosureEx<T>() {
+ @Override public RaftGroupCommandListener listener() {
+ return lsnr;
+ }
+
+ @Override public T command() {
+ return (T) req.command();
+ }
+
+ @Override public void success(Object res) {
+ server.send(sender, clientMsgFactory.actionResponse().result(res).build(), corellationId);
+ }
+
+ @Override public void failure(Throwable t) {
+ sendError(sender, corellationId, RaftErrorCode.ILLEGAL_STATE);
+ }
+ })) {
+ // Queue out of capacity.
+ sendError(sender, corellationId, RaftErrorCode.BUSY);
+
+ return;
+ }
+ }
+
+ private <T extends Command> void processQueue(
+ BlockingQueue<CommandClosureEx<T>> queue,
+ BiConsumer<RaftGroupCommandListener, Iterator<CommandClosure<T>>> clo
+ ) {
+ while (!Thread.interrupted()) {
+ try {
+ CommandClosureEx<T> cmdClo = queue.take();
+
+ RaftGroupCommandListener lsnr = cmdClo.listener();
+
+ if (lsnr == null)
+ cmdClo.failure(new RaftException(RaftErrorCode.ILLEGAL_STATE));
+ else
+ clo.accept(lsnr, List.<CommandClosure<T>>of(cmdClo).iterator());
+ }
+ catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+
+ private void sendError(NetworkMember sender, String corellationId, RaftErrorCode errorCode) {
+ RaftErrorResponse resp = clientMsgFactory.raftErrorResponse().errorCode(errorCode).build();
+
+ server.send(sender, resp, corellationId);
+ }
+
+ private interface CommandClosureEx<T extends Command> extends CommandClosure<T> {
+ RaftGroupCommandListener listener();
+ }
+}
diff --git a/pom.xml b/pom.xml
index a81b672..9fd5b78 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,6 +44,7 @@
<module>modules/metastorage-client</module>
<module>modules/metastorage-common</module>
<module>modules/network</module>
+ <module>modules/raft</module>
<module>modules/raft-client</module>
<module>modules/rest</module>
<module>modules/runner</module>