You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/03/01 17:06:09 UTC
[ignite-3] 02/02: IGNITE-14149 Migrate to ignite network.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch ignite-14149
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 399f640174b26a38e336024c212debf42b80ef01
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Mon Mar 1 20:05:49 2021 +0300
IGNITE-14149 Migrate to ignite network.
---
.../org/apache/ignite/network/NetworkCluster.java | 15 ++-
.../network/scalecube/ScaleCubeNetworkCluster.java | 19 +--
modules/raft-client/pom.xml | 5 +
.../org/apache/ignite/raft/ElectionPriority.java | 2 +-
.../main/java/org/apache/ignite/raft/PeerId.java | 18 +--
.../java/org/apache/ignite/raft/RaftException.java | 16 ---
.../ignite/raft/client/RaftClientMessages.java | 66 ++++------
.../client/message/AddLearnersRequestImpl.java | 2 +-
.../raft/client/message/AddPeerRequestImpl.java | 2 +-
.../raft/client/message/ChangePeerRequestImpl.java | 2 +-
.../raft/client/message/GetLeaderRequestImpl.java | 2 +-
.../raft/client/message/GetPeersRequestImpl.java | 2 +-
.../raft/client/message/PingRequestImpl.java | 21 ----
...rFactory.java => RaftClientMessageFactory.java} | 6 +-
...tory.java => RaftClientMessageFactoryImpl.java} | 14 +--
.../client/message/RemoveLearnersRequestImpl.java | 2 +-
.../raft/client/message/RemovePeerRequestImpl.java | 2 +-
.../client/message/ResetLearnersRequestImpl.java | 2 +-
.../raft/client/message/ResetPeerRequestImpl.java | 2 +-
.../raft/client/message/SnapshotRequestImpl.java | 2 +-
.../raft/client/message/StatusResponseImpl.java | 32 -----
.../client/message/TransferLeaderRequestImpl.java | 2 +-
.../raft/client/message/UserRequestImpl.java | 10 +-
.../ignite/raft/client/rpc/RaftGroupRpcClient.java | 30 +++--
.../client/rpc/impl/RaftGroupRpcClientImpl.java | 135 +++++----------------
.../service/RaftGroupClientRequestService.java | 2 +-
.../client/service/RaftGroupManagmentService.java | 8 +-
.../impl/RaftGroupClientRequestServiceImpl.java | 12 +-
.../impl/RaftGroupManagementServiceImpl.java | 2 +-
.../org/apache/ignite/raft/rpc/InvokeCallback.java | 24 ----
.../java/org/apache/ignite/raft/rpc/Message.java | 4 -
.../main/java/org/apache/ignite/raft/rpc/Node.java | 10 --
.../java/org/apache/ignite/raft/rpc/NodeImpl.java | 49 --------
.../apache/ignite/raft/rpc/RaftGroupMessage.java | 8 --
.../java/org/apache/ignite/raft/rpc/RpcClient.java | 59 ---------
.../org/apache/ignite/raft/client/MockUtils.java | 71 +++++++++++
.../raft/client/rpc/RaftGroupRpcClientTest.java | 118 ++++++------------
.../service/RaftGroupClientRequestServiceTest.java | 104 +++-------------
38 files changed, 257 insertions(+), 625 deletions(-)
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
index 5bdd576..f271b53 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
@@ -17,6 +17,7 @@
package org.apache.ignite.network;
import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
/**
@@ -59,7 +60,19 @@ public interface NetworkCluster {
* @param member Network member which should receive the message.
* @param msg Message which should be delivered.
*/
- Future<?> guaranteedSend(NetworkMember member, Object msg);
+ Future<?> send(NetworkMember member, Object msg);
+
+ /**
+ * Sends asynchronously a message with same guarantees as for {@link #send(NetworkMember, Object)} and
+ * returns a response (RPC style).
+ *
+ * @param member Network member which should receive the message.
+ * @param msg A message.
+ * @param timeout Waiting for response timeout in milliseconds.
+ * @param <R> Expected response type.
+ * @return A future holding the response or error if the expected response was not received.
+ */
+ <R> CompletableFuture<R> sendWithResponse(NetworkMember member, Object msg, long timeout);
/**
* Add provider which allows to get configured handlers for different cluster events(ex. received message).
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
index 0dc1087..f2ebbac 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
@@ -16,9 +16,12 @@
*/
package org.apache.ignite.network.scalecube;
+import io.scalecube.cluster.transport.api.Message;
+import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
+import java.util.function.Function;
import java.util.stream.Collectors;
import io.scalecube.cluster.Cluster;
import org.apache.ignite.network.NetworkCluster;
@@ -29,6 +32,7 @@ import org.apache.ignite.network.NetworkMessageHandler;
import org.apache.ignite.network.MessageHandlerHolder;
import static io.scalecube.cluster.transport.api.Message.fromData;
+import static java.time.Duration.ofMillis;
/**
* Implementation of {@link NetworkCluster} based on ScaleCube.
@@ -84,15 +88,14 @@ public class ScaleCubeNetworkCluster implements NetworkCluster {
}
/** {@inheritDoc} */
- @Override public Future<?> guaranteedSend(NetworkMember member, Object msg) {
- cluster.send(memberResolver.resolveMember(member), fromData(msg))
- .block();
-
- CompletableFuture<Object> future = new CompletableFuture<>();
-
- future.complete(null);
+ @Override public Future<?> send(NetworkMember member, Object msg) {
+ return cluster.send(memberResolver.resolveMember(member), fromData(msg)).toFuture();
+ }
- return future;
+ /** {@inheritDoc} */
+ @Override public <R> CompletableFuture<R> sendWithResponse(NetworkMember member, Object msg, long timeout) {
+ return cluster.requestResponse(memberResolver.resolveMember(member), fromData(msg))
+ .timeout(ofMillis(timeout)).toFuture().thenApply(m -> m.data());
}
/** {@inheritDoc} */
diff --git a/modules/raft-client/pom.xml b/modules/raft-client/pom.xml
index 6242962..06be309 100644
--- a/modules/raft-client/pom.xml
+++ b/modules/raft-client/pom.xml
@@ -36,6 +36,11 @@
<dependencies>
<dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-network</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java
index 376a589..8dd5ddf 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/ElectionPriority.java
@@ -21,7 +21,7 @@ package org.apache.ignite.raft;
*/
public class ElectionPriority {
/**
- * Priority -1 represents this node disabled the priority election function.
+ * Priority -1 means this node has disabled the priority election function.
*/
public static final int DISABLED = -1;
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
index 7b5fec0..9587c87 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
@@ -17,21 +17,21 @@
package org.apache.ignite.raft;
import java.io.Serializable;
-import org.apache.ignite.raft.rpc.Node;
+import org.apache.ignite.network.NetworkMember;
/**
* Represents a participant in a replicating group.
*/
-public class PeerId implements Serializable {
+public final class PeerId implements Serializable {
private static final long serialVersionUID = 8083529734784884641L;
/**
- * Owning node.
+ * Cluster node for peer.
*/
- private final Node node;
+ private final NetworkMember node;
/**
- * Node's local priority value, if node don't support priority election, this value is -1.
+ * Peer's local priority value, if node don't support priority election, this value is -1.
*/
private final int priority;
@@ -40,17 +40,17 @@ public class PeerId implements Serializable {
this.priority = peer.getPriority();
}
- public PeerId(Node node) {
+ public PeerId(NetworkMember node) {
this(node, ElectionPriority.DISABLED);
}
- public PeerId(final Node node, final int priority) {
+ public PeerId(final NetworkMember node, final int priority) {
super();
this.node = node;
this.priority = priority;
}
- public Node getNode() {
+ public NetworkMember getNode() {
return this.node;
}
@@ -77,6 +77,6 @@ public class PeerId implements Serializable {
}
@Override public String toString() {
- return node.id() + ":" + priority;
+ return node.name() + ":" + priority;
}
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/RaftException.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/RaftException.java
deleted file mode 100644
index f63f6f0..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/RaftException.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.ignite.raft;
-
-/** */
-public class RaftException extends Exception {
- private final int statusCode;
-
- public RaftException(int statusCode, String statusMsg) {
- super(statusMsg);
-
- this.statusCode = statusCode;
- }
-
- public int getStatusCode() {
- return statusCode;
- }
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientMessages.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientMessages.java
index 3f44685..c809a0d 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientMessages.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientMessages.java
@@ -21,8 +21,6 @@ package org.apache.ignite.raft.client;
import java.util.List;
import org.apache.ignite.raft.PeerId;
-import org.apache.ignite.raft.rpc.Message;
-import org.apache.ignite.raft.rpc.RaftGroupMessage;
/**
*
@@ -31,21 +29,7 @@ public final class RaftClientMessages {
private RaftClientMessages() {
}
- public interface StatusResponse extends Message {
- int getStatusCode();
-
- String getStatusMsg();
-
- interface Builder {
- Builder setStatusCode(int code);
-
- Builder setStatusMsg(String msg);
-
- StatusResponse build();
- }
- }
-
- public interface PingRequest extends Message {
+ public interface PingRequest {
long getSendTimestamp();
interface Builder {
@@ -55,7 +39,7 @@ public final class RaftClientMessages {
}
}
- public interface AddPeerRequest extends RaftGroupMessage {
+ public interface AddPeerRequest {
PeerId getPeerId();
interface Builder {
@@ -67,7 +51,7 @@ public final class RaftClientMessages {
}
}
- public interface AddPeerResponse extends Message {
+ public interface AddPeerResponse {
List<PeerId> getOldPeersList();
List<PeerId> getNewPeersList();
@@ -81,7 +65,7 @@ public final class RaftClientMessages {
}
}
- public interface RemovePeerRequest extends RaftGroupMessage {
+ public interface RemovePeerRequest {
PeerId getPeerId();
interface Builder {
@@ -93,7 +77,7 @@ public final class RaftClientMessages {
}
}
- public interface RemovePeerResponse extends Message {
+ public interface RemovePeerResponse {
List<PeerId> getOldPeersList();
List<PeerId> getNewPeersList();
@@ -107,7 +91,7 @@ public final class RaftClientMessages {
}
}
- public interface ChangePeersRequest extends RaftGroupMessage {
+ public interface ChangePeersRequest {
List<PeerId> getNewPeersList();
public interface Builder {
@@ -119,7 +103,7 @@ public final class RaftClientMessages {
}
}
- public interface ChangePeersResponse extends Message {
+ public interface ChangePeersResponse {
List<PeerId> getOldPeersList();
List<PeerId> getNewPeersList();
@@ -133,7 +117,7 @@ public final class RaftClientMessages {
}
}
- public interface SnapshotRequest extends RaftGroupMessage {
+ public interface SnapshotRequest {
public interface Builder {
Builder setGroupId(String groupId);
@@ -141,7 +125,7 @@ public final class RaftClientMessages {
}
}
- public interface ResetPeerRequest extends RaftGroupMessage {
+ public interface ResetPeerRequest {
List<PeerId> getNewPeersList();
public interface Builder {
@@ -153,7 +137,7 @@ public final class RaftClientMessages {
}
}
- public interface TransferLeaderRequest extends RaftGroupMessage {
+ public interface TransferLeaderRequest {
PeerId getPeerId();
public interface Builder {
@@ -163,7 +147,7 @@ public final class RaftClientMessages {
}
}
- public interface GetLeaderRequest extends RaftGroupMessage {
+ public interface GetLeaderRequest {
public interface Builder {
Builder setGroupId(String groupId);
@@ -171,7 +155,7 @@ public final class RaftClientMessages {
}
}
- public interface GetLeaderResponse extends Message {
+ public interface GetLeaderResponse {
PeerId getLeaderId();
public interface Builder {
@@ -181,7 +165,7 @@ public final class RaftClientMessages {
}
}
- public interface GetPeersRequest extends RaftGroupMessage {
+ public interface GetPeersRequest {
boolean getOnlyAlive();
public interface Builder {
@@ -193,7 +177,7 @@ public final class RaftClientMessages {
}
}
- public interface GetPeersResponse extends Message {
+ public interface GetPeersResponse {
List<PeerId> getPeersList();
List<PeerId> getLearnersList();
@@ -207,7 +191,7 @@ public final class RaftClientMessages {
}
}
- public interface AddLearnersRequest extends RaftGroupMessage {
+ public interface AddLearnersRequest {
List<PeerId> getLearnersList();
public interface Builder {
@@ -219,7 +203,7 @@ public final class RaftClientMessages {
}
}
- public interface RemoveLearnersRequest extends RaftGroupMessage {
+ public interface RemoveLearnersRequest {
List<PeerId> getLearnersList();
public interface Builder {
@@ -231,7 +215,7 @@ public final class RaftClientMessages {
}
}
- public interface ResetLearnersRequest extends RaftGroupMessage {
+ public interface ResetLearnersRequest {
List<PeerId> getLearnersList();
public interface Builder {
@@ -243,7 +227,7 @@ public final class RaftClientMessages {
}
}
- public interface LearnersOpResponse extends Message {
+ public interface LearnersOpResponse {
List<PeerId> getOldLearnersList();
List<PeerId> getNewLearnersList();
@@ -257,19 +241,21 @@ public final class RaftClientMessages {
}
}
- public interface UserRequest<T> extends RaftGroupMessage {
- T request();
+ public interface UserRequest {
+ Object request();
- public interface Builder<T> {
- Builder setRequest(T request);
+ String getGroupId();
+
+ public interface Builder {
+ Builder setRequest(Object request);
Builder setGroupId(String groupId);
- UserRequest<T> build();
+ UserRequest build();
}
}
- public interface UserResponse<T> extends Message {
+ public interface UserResponse<T> {
T response();
public interface Builder<T> {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequestImpl.java
index 2bb1125..dec9076 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddLearnersRequestImpl.java
@@ -10,7 +10,7 @@ public class AddLearnersRequestImpl implements RaftClientMessages.AddLearnersReq
private PeerId leaderId;
private List<PeerId> learnersList = new ArrayList<>();
- @Override public String getGroupId() {
+ public String getGroupId() {
return groupId;
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeerRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeerRequestImpl.java
index ba6d2c8..d18ee14 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeerRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/AddPeerRequestImpl.java
@@ -8,7 +8,7 @@ class AddPeerRequestImpl implements RaftClientMessages.AddPeerRequest, RaftClien
private PeerId leaderId;
private PeerId peerId;
- @Override public String getGroupId() {
+ public String getGroupId() {
return groupId;
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeerRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeerRequestImpl.java
index 47d3abc..4c611c7 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeerRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ChangePeerRequestImpl.java
@@ -9,7 +9,7 @@ class ChangePeerRequestImpl implements RaftClientMessages.ChangePeersRequest, Ra
private String groupId;
private List<PeerId> newPeersList = new ArrayList<>();
- @Override public String getGroupId() {
+ public String getGroupId() {
return groupId;
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java
index dbc7566..4162f81 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java
@@ -6,7 +6,7 @@ import org.apache.ignite.raft.client.RaftClientMessages;
public class GetLeaderRequestImpl implements RaftClientMessages.GetLeaderRequest, RaftClientMessages.GetLeaderRequest.Builder {
private String groupId;
- @Override public String getGroupId() {
+ public String getGroupId() {
return groupId;
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequestImpl.java
index ea50a57..d3a2d99 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetPeersRequestImpl.java
@@ -6,7 +6,7 @@ class GetPeersRequestImpl implements RaftClientMessages.GetPeersRequest, RaftCli
private String groupId;
private boolean onlyAlive;
- @Override public String getGroupId() {
+ public String getGroupId() {
return groupId;
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/PingRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/PingRequestImpl.java
deleted file mode 100644
index b0c0a78..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/PingRequestImpl.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package org.apache.ignite.raft.client.message;
-
-import org.apache.ignite.raft.client.RaftClientMessages;
-
-class PingRequestImpl implements RaftClientMessages.PingRequest, RaftClientMessages.PingRequest.Builder {
- private long sendTimestamp;
-
- @Override public long getSendTimestamp() {
- return sendTimestamp;
- }
-
- @Override public Builder setSendTimestamp(long timestamp) {
- this.sendTimestamp = timestamp;
-
- return this;
- }
-
- @Override public RaftClientMessages.PingRequest build() {
- return this;
- }
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ClientMessageBuilderFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactory.java
similarity index 89%
rename from modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ClientMessageBuilderFactory.java
rename to modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactory.java
index 4351a67..95a350d 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ClientMessageBuilderFactory.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactory.java
@@ -3,11 +3,7 @@ package org.apache.ignite.raft.client.message;
import org.apache.ignite.raft.client.RaftClientMessages;
/** */
-public interface ClientMessageBuilderFactory {
- RaftClientMessages.PingRequest.Builder createPingRequest();
-
- RaftClientMessages.StatusResponse.Builder createStatusResponse();
-
+public interface RaftClientMessageFactory {
RaftClientMessages.AddPeerRequest.Builder createAddPeerRequest();
RaftClientMessages.AddPeerResponse.Builder createAddPeerResponse();
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageBuilderFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactoryImpl.java
similarity index 85%
rename from modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageBuilderFactory.java
rename to modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactoryImpl.java
index 2d47315..6e170ae 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageBuilderFactory.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientMessageFactoryImpl.java
@@ -3,18 +3,10 @@ package org.apache.ignite.raft.client.message;
import org.apache.ignite.raft.client.RaftClientMessages;
/**
- * Raft client message builders factory.
+ * Raft client message factory.
*/
-public class RaftClientMessageBuilderFactory implements ClientMessageBuilderFactory {
- public static RaftClientMessageBuilderFactory INSTANCE = new RaftClientMessageBuilderFactory();
-
- @Override public RaftClientMessages.PingRequest.Builder createPingRequest() {
- return new PingRequestImpl();
- }
-
- @Override public RaftClientMessages.StatusResponse.Builder createStatusResponse() {
- return new StatusResponseImpl();
- }
+public class RaftClientMessageFactoryImpl implements RaftClientMessageFactory {
+ public static RaftClientMessageFactoryImpl INSTANCE = new RaftClientMessageFactoryImpl();
@Override public RaftClientMessages.AddPeerRequest.Builder createAddPeerRequest() {
return new AddPeerRequestImpl();
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequestImpl.java
index e55aefc..b319b69 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemoveLearnersRequestImpl.java
@@ -9,7 +9,7 @@ class RemoveLearnersRequestImpl implements RaftClientMessages.RemoveLearnersRequ
private String groupId;
private List<PeerId> learnersList = new ArrayList<>();
- @Override public String getGroupId() {
+ public String getGroupId() {
return groupId;
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeerRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeerRequestImpl.java
index 0e4934d..a6d57b1 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeerRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RemovePeerRequestImpl.java
@@ -7,7 +7,7 @@ class RemovePeerRequestImpl implements RaftClientMessages.RemovePeerRequest, Raf
private String groupId;
private PeerId peerId;
- @Override public String getGroupId() {
+ public String getGroupId() {
return groupId;
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetLearnersRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetLearnersRequestImpl.java
index 340a39f..e1d144d 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetLearnersRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetLearnersRequestImpl.java
@@ -9,7 +9,7 @@ class ResetLearnersRequestImpl implements RaftClientMessages.ResetLearnersReques
private String groupId;
private List<PeerId> learnersList = new ArrayList<>();
- @Override public String getGroupId() {
+ public String getGroupId() {
return groupId;
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetPeerRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetPeerRequestImpl.java
index 6c8d4bc..4ead585 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetPeerRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ResetPeerRequestImpl.java
@@ -9,7 +9,7 @@ class ResetPeerRequestImpl implements RaftClientMessages.ResetPeerRequest, RaftC
private String groupId;
private List<PeerId> newPeersList = new ArrayList<>();
- @Override public String getGroupId() {
+ public String getGroupId() {
return groupId;
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequestImpl.java
index 08a09dc..2b2eea6 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/SnapshotRequestImpl.java
@@ -5,7 +5,7 @@ import org.apache.ignite.raft.client.RaftClientMessages;
class SnapshotRequestImpl implements RaftClientMessages.SnapshotRequest, RaftClientMessages.SnapshotRequest.Builder {
private String groupId;
- @Override public String getGroupId() {
+ public String getGroupId() {
return groupId;
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/StatusResponseImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/StatusResponseImpl.java
deleted file mode 100644
index 6ae0b6b..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/StatusResponseImpl.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package org.apache.ignite.raft.client.message;
-
-import org.apache.ignite.raft.client.RaftClientMessages;
-
-class StatusResponseImpl implements RaftClientMessages.StatusResponse, RaftClientMessages.StatusResponse.Builder {
- private int errorCode;
- private String errorMsg = "";
-
- @Override public int getStatusCode() {
- return errorCode;
- }
-
- @Override public Builder setStatusCode(int errorCode) {
- this.errorCode = errorCode;
-
- return this;
- }
-
- @Override public String getStatusMsg() {
- return errorMsg;
- }
-
- @Override public Builder setStatusMsg(String errorMsg) {
- this.errorMsg = errorMsg;
-
- return this;
- }
-
- @Override public RaftClientMessages.StatusResponse build() {
- return this;
- }
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeaderRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeaderRequestImpl.java
index 2cc6592..24957df 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeaderRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/TransferLeaderRequestImpl.java
@@ -7,7 +7,7 @@ class TransferLeaderRequestImpl implements RaftClientMessages.TransferLeaderRequ
private String groupId;
private PeerId peerId;
- @Override public String getGroupId() {
+ public String getGroupId() {
return groupId;
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/UserRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/UserRequestImpl.java
index 06d667e..93cc463 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/UserRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/UserRequestImpl.java
@@ -2,11 +2,11 @@ package org.apache.ignite.raft.client.message;
import org.apache.ignite.raft.client.RaftClientMessages;
-public class UserRequestImpl<T> implements RaftClientMessages.UserRequest<T>, RaftClientMessages.UserRequest.Builder<T> {
- private T request;
+public class UserRequestImpl<T> implements RaftClientMessages.UserRequest, RaftClientMessages.UserRequest.Builder {
+ private Object request;
private String groupId;
- @Override public T request() {
+ @Override public Object request() {
return request;
}
@@ -16,13 +16,13 @@ public class UserRequestImpl<T> implements RaftClientMessages.UserRequest<T>, Ra
return this;
}
- @Override public Builder setRequest(T request) {
+ @Override public Builder setRequest(Object request) {
this.request = request;
return this;
}
- @Override public RaftClientMessages.UserRequest<T> build() {
+ @Override public RaftClientMessages.UserRequest build() {
return this;
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
index 14384e9..382e953 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
@@ -17,11 +17,12 @@
package org.apache.ignite.raft.client.rpc;
import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
import org.apache.ignite.raft.State;
import org.apache.ignite.raft.PeerId;
-import org.apache.ignite.raft.client.message.ClientMessageBuilderFactory;
-import org.apache.ignite.raft.rpc.Message;
-import org.apache.ignite.raft.rpc.RaftGroupMessage;
+import org.apache.ignite.raft.client.RaftClientMessages.UserRequest;
+import org.apache.ignite.raft.client.RaftClientMessages.UserResponse;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
import static org.apache.ignite.raft.client.RaftClientMessages.AddLearnersRequest;
import static org.apache.ignite.raft.client.RaftClientMessages.AddPeerRequest;
@@ -35,30 +36,27 @@ import static org.apache.ignite.raft.client.RaftClientMessages.RemovePeerRespons
import static org.apache.ignite.raft.client.RaftClientMessages.ResetLearnersRequest;
import static org.apache.ignite.raft.client.RaftClientMessages.ResetPeerRequest;
import static org.apache.ignite.raft.client.RaftClientMessages.SnapshotRequest;
-import static org.apache.ignite.raft.client.RaftClientMessages.StatusResponse;
import static org.apache.ignite.raft.client.RaftClientMessages.TransferLeaderRequest;
/**
- * Low-level raft group RPC client.
- * <p>
- * Additionally maintains raft group state.
+ * Replicating group RPC client.
*/
public interface RaftGroupRpcClient {
/**
* @param groupId Group id.
* @return Group state snapshot.
*/
- State state(String groupId);
+ @NotNull State state(String groupId);
/**
- * Refreshes a group leader.
+ * Refreshes a replicating group leader.
* @param groupId Group id.
* @return A future.
*/
CompletableFuture<PeerId> refreshLeader(String groupId);
/**
- * Refreshes group members (but without a leader).
+ * Refreshes a replicating group members (except a leader).
* @param groupId Group id.
* @return A future.
*/
@@ -88,7 +86,7 @@ public interface RaftGroupRpcClient {
* @param request request data
* @return A future with result.
*/
- CompletableFuture<StatusResponse> resetPeers(PeerId peerId, ResetPeerRequest request);
+ CompletableFuture<Void> resetPeers(PeerId peerId, ResetPeerRequest request);
/**
* Takes a local snapshot.
@@ -98,7 +96,7 @@ public interface RaftGroupRpcClient {
* @param done callback
* @return a future with result
*/
- CompletableFuture<StatusResponse> snapshot(PeerId peerId, SnapshotRequest request);
+ CompletableFuture<Void> snapshot(PeerId peerId, SnapshotRequest request);
/**
* Change peers.
@@ -148,20 +146,20 @@ public interface RaftGroupRpcClient {
* @param done callback
* @return a future with result
*/
- CompletableFuture<StatusResponse> transferLeader(TransferLeaderRequest request);
+ CompletableFuture<Void> transferLeader(TransferLeaderRequest request);
/**
- * Performs a custom action defined by specific request on the raft group leader.
+ * Performs a user action defined by specific request to the raft group leader.
*
* @param endpoint server address
* @param request request data
* @param done callback
* @return a future with result
*/
- <R extends Message> CompletableFuture<R> sendCustom(RaftGroupMessage request);
+ <R> CompletableFuture<UserResponse<R>> sendUserRequest(UserRequest request);
/**
* @return A message builder factory.
*/
- ClientMessageBuilderFactory factory();
+ RaftClientMessageFactory factory();
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
index 4599556..4f46ed9 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
@@ -6,53 +6,43 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
+import org.apache.ignite.network.NetworkCluster;
+import org.apache.ignite.network.NetworkMember;
import org.apache.ignite.raft.PeerId;
-import org.apache.ignite.raft.RaftException;
import org.apache.ignite.raft.State;
import org.apache.ignite.raft.client.RaftClientMessages;
import org.apache.ignite.raft.client.RaftClientMessages.GetLeaderResponse;
-import org.apache.ignite.raft.client.RaftClientMessages.StatusResponse;
-import org.apache.ignite.raft.client.message.ClientMessageBuilderFactory;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
-import org.apache.ignite.raft.rpc.InvokeCallback;
-import org.apache.ignite.raft.rpc.Message;
-import org.apache.ignite.raft.rpc.Node;
-import org.apache.ignite.raft.rpc.RaftGroupMessage;
-import org.apache.ignite.raft.rpc.RpcClient;
import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.CompletableFuture.completedFuture;
public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
- private final ExecutorService executor;
- private final int defaultTimeout;
- private final RpcClient rpcClient;
+ private final NetworkCluster cluster;
/** Where to ask for initial configuration. */
- private final Set<Node> initialCfgNodes;
+ private final Set<NetworkMember> initialCfgNodes;
+
+ /** */
+ private final RaftClientMessageFactory factory;
/** */
- private final ClientMessageBuilderFactory factory;
+ private final int defaultTimeout;
private Map<String, StateImpl> states = new ConcurrentHashMap<>();
/**
* Accepts dependencies in constructor.
- * @param rpcClient
- * @param defaultTimeout
+ * @param cluster Cluster.
+ * @param defaultTimeout Default request timeout.
* @param initialCfgNode Initial configuration nodes.
*/
- public RaftGroupRpcClientImpl(RpcClient rpcClient, ClientMessageBuilderFactory factory, int defaultTimeout, Set<Node> initialCfgNodes) {
+ public RaftGroupRpcClientImpl(NetworkCluster cluster, RaftClientMessageFactory factory, int defaultTimeout, Set<NetworkMember> initialCfgNodes) {
this.defaultTimeout = defaultTimeout;
- this.rpcClient = rpcClient;
+ this.cluster = cluster;
this.factory = factory;
this.initialCfgNodes = new HashSet<>(initialCfgNodes);
- executor = Executors.newWorkStealingPool();
}
@Override public State state(String groupId) {
@@ -60,27 +50,17 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
}
private StateImpl getState(String groupId) {
- StateImpl newState = new StateImpl();
-
- StateImpl state = states.putIfAbsent(groupId, newState);
-
- if (state == null)
- state = newState;
-
- return state;
+ return states.computeIfAbsent(groupId, k -> new StateImpl());
}
@Override public CompletableFuture<PeerId> refreshLeader(String groupId) {
StateImpl state = getState(groupId);
- return refreshLeader(initialCfgNodes.iterator().next(), groupId).
- thenApply(resp -> {
- PeerId leaderId = resp.getLeaderId();
+ RaftClientMessages.GetLeaderRequest req = factory.createGetLeaderRequest().setGroupId(groupId).build();
- state.leader = leaderId;
+ CompletableFuture<GetLeaderResponse> fut = cluster.sendWithResponse(initialCfgNodes.iterator().next(), req, defaultTimeout);
- return leaderId;
- });
+ return fut.thenApply(resp -> state.leader = resp.getLeaderId());
}
@Override public CompletableFuture<State> refreshMembers(String groupId) {
@@ -95,11 +75,11 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
return null;
}
- @Override public CompletableFuture<StatusResponse> resetPeers(PeerId peerId, RaftClientMessages.ResetPeerRequest request) {
+ @Override public CompletableFuture<Void> resetPeers(PeerId peerId, RaftClientMessages.ResetPeerRequest request) {
return null;
}
- @Override public CompletableFuture<StatusResponse> snapshot(PeerId peerId, RaftClientMessages.SnapshotRequest request) {
+ @Override public CompletableFuture<Void> snapshot(PeerId peerId, RaftClientMessages.SnapshotRequest request) {
return null;
}
@@ -119,82 +99,23 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
return null;
}
- @Override public CompletableFuture<StatusResponse> transferLeader(RaftClientMessages.TransferLeaderRequest request) {
+ @Override public CompletableFuture<Void> transferLeader(RaftClientMessages.TransferLeaderRequest request) {
return null;
}
- private CompletableFuture<GetLeaderResponse> refreshLeader(Node node, String groupId) {
- StateImpl state = getState(groupId);
-
- while(true) {
- CompletableFuture<GetLeaderResponse> fut = state.updateFutRef.get();
-
- if (fut != null)
- return fut;
-
- if (state.updateFutRef.compareAndSet(null, (fut = new CompletableFuture<>()))) {
- RaftClientMessages.GetLeaderRequest req = factory.createGetLeaderRequest().setGroupId(groupId).build();
+ @Override public CompletableFuture<RaftClientMessages.UserResponse> sendUserRequest(RaftClientMessages.UserRequest request) {
+ if (request.getGroupId() == null)
+ throw new IllegalArgumentException("groupId is required");
- CompletableFuture<GetLeaderResponse> finalFut = fut;
-
- rpcClient.invokeAsync(node, req, new InvokeCallback<GetLeaderResponse>() {
- @Override public void complete(GetLeaderResponse response, Throwable err) {
- if (err != null)
- finalFut.completeExceptionally(err);
- else
- finalFut.complete(response);
-
- state.updateFutRef.set(null);
- }
- }, executor, defaultTimeout);
-
- return fut;
- }
- }
- }
-
- @Override public <R extends Message> CompletableFuture<R> sendCustom(RaftGroupMessage request) {
State state = state(request.getGroupId());
- CompletableFuture<R> fut = new CompletableFuture<>();
-
- fut.orTimeout(defaultTimeout, TimeUnit.MILLISECONDS);
-
CompletableFuture<PeerId> fut0 = state.leader() == null ?
refreshLeader(request.getGroupId()) : completedFuture(state.leader());
- fut0.whenComplete(new BiConsumer<PeerId, Throwable>() {
- @Override public void accept(PeerId peerId, Throwable error) {
- if (error == null) {
- rpcClient.invokeAsync(peerId.getNode(), request, new InvokeCallback<R>() {
- @Override public void complete(R response, Throwable err) {
- if (err != null)
- fut.completeExceptionally(err);
- else {
- if (response instanceof StatusResponse) {
- StatusResponse resp = (StatusResponse) response;
-
- // Translate error response to exception with the code.
- if (resp.getStatusCode() != 0)
- fut.completeExceptionally(new RaftException(resp.getStatusCode(), resp.getStatusMsg()));
- else
- fut.complete(response);
- } else
- fut.complete(response);
- }
- }
- }, executor, defaultTimeout);
- }
- else {
- fut.completeExceptionally(error);
- }
- }
- });
-
- return fut;
- }
-
- @Override public ClientMessageBuilderFactory factory() {
+ return fut0.thenCompose(peerId -> cluster.sendWithResponse(peerId.getNode(), request, defaultTimeout));
+ }
+
+ @Override public RaftClientMessageFactory factory() {
return this.factory;
}
@@ -205,8 +126,6 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
private volatile List<PeerId> learners;
- private AtomicReference<CompletableFuture<GetLeaderResponse>> updateFutRef = new AtomicReference<>();
-
@Override public @Nullable PeerId leader() {
return leader;
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java
index 2c5e04a..a4a8a59 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java
@@ -13,5 +13,5 @@ public interface RaftGroupClientRequestService {
* @param <R> Response.
* @return A future.
*/
- <T, R> CompletableFuture<R> submit(T request);
+ <R> CompletableFuture<R> submit(Object request);
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupManagmentService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupManagmentService.java
index d868dc9..2bd8fac 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupManagmentService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupManagmentService.java
@@ -57,7 +57,7 @@ public interface RaftGroupManagmentService {
* @param peerId Peer id.
* @param request request data
* @param done callback
- * @return a future with result
+ * @return a future with result.
*/
CompletableFuture<Void> snapshot(PeerId peerId);
@@ -77,7 +77,7 @@ public interface RaftGroupManagmentService {
* @param endpoint server address
* @param request request data
* @param done callback
- * @return a future with result
+ * @return a future with result.
*/
CompletableFuture<PeersChangeState> addLearners(List<PeerId> peers);
@@ -97,7 +97,7 @@ public interface RaftGroupManagmentService {
* @param endpoint server address
* @param request request data
* @param done callback
- * @return a future with result
+ * @return a future with result.
*/
CompletableFuture<PeersChangeState> resetLearners(List<PeerId> peers);
@@ -107,7 +107,7 @@ public interface RaftGroupManagmentService {
* @param endpoint server address
* @param request request data
* @param done callback
- * @return a future with result
+ * @return a future with result.
*/
CompletableFuture<Void> transferLeader(PeerId newLeader);
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java
index 16d63c4..c5b7c25 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java
@@ -1,11 +1,9 @@
package org.apache.ignite.raft.client.service.impl;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
import org.apache.ignite.raft.client.RaftClientMessages;
import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
import org.apache.ignite.raft.client.service.RaftGroupClientRequestService;
-import org.apache.ignite.raft.rpc.Message;
public class RaftGroupClientRequestServiceImpl implements RaftGroupClientRequestService {
private final RaftGroupRpcClient rpcClient;
@@ -16,16 +14,12 @@ public class RaftGroupClientRequestServiceImpl implements RaftGroupClientRequest
this.groupId = groupId;
}
- @Override public <T, R> CompletableFuture<R> submit(T request) {
+ @Override public <R> CompletableFuture<R> submit(Object request) {
RaftClientMessages.UserRequest r =
rpcClient.factory().createUserRequest().setRequest(request).setGroupId(groupId).build();
- return rpcClient.sendCustom(r).thenApply(new Function<Message, R>() {
- @Override public R apply(Message message) {
- RaftClientMessages.UserResponse<R> resp = (RaftClientMessages.UserResponse<R>) message;
+ CompletableFuture<RaftClientMessages.UserResponse<R>> completableFuture = rpcClient.sendUserRequest(r);
- return resp.response();
- }
- });
+ return completableFuture.thenApply(resp -> resp.response());
}
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java
index a70c1fb..28e28c8 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java
@@ -8,7 +8,7 @@ import org.apache.ignite.raft.client.service.RaftGroupManagmentService;
import org.jetbrains.annotations.Nullable;
public class RaftGroupManagementServiceImpl implements RaftGroupManagmentService {
- private RaftGroupRpcClient rpcClient;
+ private final RaftGroupRpcClient rpcClient;
public RaftGroupManagementServiceImpl(RaftGroupRpcClient rpcClient) {
this.rpcClient = rpcClient;
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java
deleted file mode 100644
index 743d519..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.raft.rpc;
-
-/**
- *
- */
-public interface InvokeCallback<T extends Message> {
- void complete(final T response, final Throwable err);
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Message.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Message.java
deleted file mode 100644
index 175d27b..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Message.java
+++ /dev/null
@@ -1,4 +0,0 @@
-package org.apache.ignite.raft.rpc;
-
-public interface Message {
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java
deleted file mode 100644
index 966ff52..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.ignite.raft.rpc;
-
-import java.io.Serializable;
-
-/**
- * TODO FIXME asch must be elsewhere.
- */
-public interface Node extends Serializable {
- String id();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
deleted file mode 100644
index ea6f10c..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.raft.rpc;
-
-/**
- * An node with an immutable id.
- * TODO FIXME asch must be elsewhere.
- */
-public class NodeImpl implements Node {
- private final String id;
-
- public NodeImpl(String id) {
- super();
- this.id = id;
- }
-
- @Override public String id() {
- return id;
- }
-
- @Override public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- NodeImpl node = (NodeImpl) o;
-
- if (!id.equals(node.id)) return false;
-
- return true;
- }
-
- @Override public int hashCode() {
- return id.hashCode();
- }
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java
deleted file mode 100644
index 071360c..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.ignite.raft.rpc;
-
-/**
- * A message targeted to a specific raft group.
- */
-public interface RaftGroupMessage extends Message {
- public String getGroupId();
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java
deleted file mode 100644
index 4231dae..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.raft.rpc;
-
-import java.util.concurrent.Executor;
-
-/**
- *
- */
-public interface RpcClient {
- /**
- * Check connection for given address.
- *
- * @param node target address
- * @return true if there is a connection and the connection is active and writable.
- */
- boolean checkConnection(final Node node);
-
- /**
- * Check connection for given address and async to create a new one if there is no connection.
- *
- * @param node target address
- * @param createIfAbsent create a new one if there is no connection
- * @return true if there is a connection and the connection is active and writable.
- */
- boolean checkConnection(final Node node, final boolean createIfAbsent);
-
- /**
- * Close all connections of a address.
- *
- * @param node target address
- */
- void closeConnection(final Node node);
-
- /**
- * Asynchronous invocation with a callback.
- *
- * @param node Target node.
- * @param request Request object
- * @param callback Invoke callback.
- * @param executor Executor to run invoke callback.
- * @param timeoutMs Timeout millisecond.
- */
- void invokeAsync(final Node node, final Message request, InvokeCallback callback, Executor executor, final long timeoutMs);
-}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/MockUtils.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/MockUtils.java
new file mode 100644
index 0000000..9edc931
--- /dev/null
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/MockUtils.java
@@ -0,0 +1,71 @@
+package org.apache.ignite.raft.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.network.NetworkCluster;
+import org.apache.ignite.network.NetworkMember;
+import org.apache.ignite.raft.PeerId;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.apache.ignite.raft.client.message.RaftClientMessageFactoryImpl.INSTANCE;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+
+public class MockUtils {
+ public static PeerId LEADER = new PeerId(new NetworkMember("test"));
+
+ public static class TestInput1 {
+ }
+
+ public static class TestOutput1 {
+ }
+
+ public static class TestInput2 {
+ }
+
+ public static class TestOutput2 {
+ }
+
+ public static void mockUserInput1(NetworkCluster cluster) {
+ Mockito.doAnswer(new Answer() {
+ @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+ RaftClientMessages.UserResponse resp = INSTANCE.createUserResponse().setResponse(new TestOutput1()).build();
+
+ return CompletableFuture.completedFuture(resp);
+ }
+ }).when(cluster).sendWithResponse(eq(LEADER.getNode()), argThat(new ArgumentMatcher<RaftClientMessages.UserRequest>() {
+ @Override public boolean matches(RaftClientMessages.UserRequest arg) {
+ return arg.request() instanceof TestInput1;
+ }
+ }), anyLong());
+ }
+
+ public static void mockUserInput2(NetworkCluster cluster) {
+ Mockito.doAnswer(new Answer() {
+ @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+ RaftClientMessages.UserResponse resp = INSTANCE.createUserResponse().setResponse(new TestOutput2()).build();
+
+ return CompletableFuture.completedFuture(resp);
+ }
+ }).when(cluster).sendWithResponse(eq(LEADER.getNode()), argThat(new ArgumentMatcher<RaftClientMessages.UserRequest>() {
+ @Override public boolean matches(RaftClientMessages.UserRequest arg) {
+ return arg.request() instanceof TestInput2;
+ }
+ }), anyLong());
+ }
+
+ public static void mockLeaderRequest(NetworkCluster cluster, boolean timeout) {
+ Mockito.doAnswer(new Answer() {
+ @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+ RaftClientMessages.GetLeaderResponse resp = INSTANCE.createGetLeaderResponse().setLeaderId(LEADER).build();
+
+ return timeout ? CompletableFuture.failedFuture(new TimeoutException()) : CompletableFuture.completedFuture(resp);
+ }
+ }).when(cluster).sendWithResponse(eq(LEADER.getNode()), any(RaftClientMessages.GetLeaderRequest.class), anyLong());
+ }
+}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClientTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClientTest.java
index 5247c59..8ab61e3 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClientTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClientTest.java
@@ -2,62 +2,56 @@ package org.apache.ignite.raft.client.rpc;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
+import org.apache.ignite.network.NetworkCluster;
import org.apache.ignite.raft.PeerId;
-import org.apache.ignite.raft.State;
-import org.apache.ignite.raft.client.RaftClientMessages.GetLeaderRequest;
+import org.apache.ignite.raft.client.MockUtils.TestInput1;
+import org.apache.ignite.raft.client.MockUtils.TestInput2;
+import org.apache.ignite.raft.client.MockUtils.TestOutput1;
+import org.apache.ignite.raft.client.MockUtils.TestOutput2;
+import org.apache.ignite.raft.client.RaftClientMessages;
import org.apache.ignite.raft.client.rpc.impl.RaftGroupRpcClientImpl;
-import org.apache.ignite.raft.rpc.InvokeCallback;
-import org.apache.ignite.raft.rpc.Message;
-import org.apache.ignite.raft.rpc.NodeImpl;
-import org.apache.ignite.raft.rpc.RaftGroupMessage;
-import org.apache.ignite.raft.rpc.RpcClient;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.stubbing.Answer;
import static java.util.Collections.singleton;
-import static org.apache.ignite.raft.client.message.RaftClientMessageBuilderFactory.INSTANCE;
+import static org.apache.ignite.raft.client.MockUtils.LEADER;
+import static org.apache.ignite.raft.client.MockUtils.mockLeaderRequest;
+import static org.apache.ignite.raft.client.MockUtils.mockUserInput1;
+import static org.apache.ignite.raft.client.MockUtils.mockUserInput2;
+import static org.apache.ignite.raft.client.message.RaftClientMessageFactoryImpl.INSTANCE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
@ExtendWith(MockitoExtension.class)
public class RaftGroupRpcClientTest {
@Mock
- private RpcClient rpcClient;
-
- private static PeerId leader = new PeerId(new NodeImpl("test"));
+ private NetworkCluster cluster;
@Test
public void testRefreshLeader() throws Exception {
String groupId = "test";
- mockLeaderRequest(false);
+ mockLeaderRequest(cluster, false);
- RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, INSTANCE, 5_000, singleton(leader.getNode()));
+ RaftGroupRpcClient client = new RaftGroupRpcClientImpl(cluster, INSTANCE, 5_000, singleton(LEADER.getNode()));
PeerId leaderId = client.refreshLeader(groupId).get();
- assertEquals(leader, client.state(groupId).leader());
- assertEquals(leader, leaderId);
+ assertEquals(LEADER, client.state(groupId).leader());
+ assertEquals(LEADER, leaderId);
}
@Test
public void testRefreshLeaderMultithreaded() throws Exception {
String groupId = "test";
- mockLeaderRequest(false);
+ mockLeaderRequest(cluster, false);
- RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, INSTANCE, 5_000, singleton(leader.getNode()));
+ RaftGroupRpcClient client = new RaftGroupRpcClientImpl(cluster, INSTANCE, 5_000, singleton(LEADER.getNode()));
int cnt = 20;
@@ -79,8 +73,8 @@ public class RaftGroupRpcClientTest {
try {
PeerId leaderId = client.refreshLeader(groupId).get();
- assertEquals(leader, client.state(groupId).leader());
- assertEquals(leader, leaderId);
+ assertEquals(LEADER, client.state(groupId).leader());
+ assertEquals(LEADER, leaderId);
}
catch (Exception e) {
fail(e);
@@ -99,10 +93,9 @@ public class RaftGroupRpcClientTest {
public void testRefreshLeaderTimeout() throws Exception {
String groupId = "test";
- mockLeaderRequest(true);
+ mockLeaderRequest(cluster, true);
- RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, INSTANCE,
- 5_000, singleton(leader.getNode()));
+ RaftGroupRpcClient client = new RaftGroupRpcClientImpl(cluster, INSTANCE, 5_000, singleton(LEADER.getNode()));
try {
client.refreshLeader(groupId).get();
@@ -118,66 +111,23 @@ public class RaftGroupRpcClientTest {
public void testCustomMessage() throws Exception {
String groupId = "test";
- mockLeaderRequest(false);
- mockCustomRequest();
-
- RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, INSTANCE,
- 5_000, singleton(leader.getNode()));
-
- JunkRequest req = new JunkRequest(groupId);
-
- Message resp = client.sendCustom(req).get();
-
- State state = client.state(groupId);
-
- // Expecting raft group state to be transparently loaded on first request.
- assertEquals(leader, state.leader());
-
- assertTrue(resp instanceof JunkResponse);
- }
-
- private static class JunkRequest implements RaftGroupMessage {
- private final String groupId;
-
- JunkRequest(String groupId) {
- this.groupId = groupId;
- }
+ mockLeaderRequest(cluster, false);
+ mockUserInput1(cluster);
+ mockUserInput2(cluster);
- @Override public String getGroupId() {
- return groupId;
- }
- }
+ RaftGroupRpcClient client = new RaftGroupRpcClientImpl(cluster, INSTANCE, 5_000, singleton(LEADER.getNode()));
- private static class JunkResponse implements Message {}
+ RaftClientMessages.UserRequest req1 =
+ client.factory().createUserRequest().setGroupId(groupId).setRequest(new TestInput1()).build();
- private void mockCustomRequest() {
- Mockito.doAnswer(new Answer() {
- @Override public Object answer(InvocationOnMock invocation) throws Throwable {
- InvokeCallback callback = invocation.getArgument(2);
- Executor executor = invocation.getArgument(3);
+ assertTrue(client.sendUserRequest(req1).get().response() instanceof TestOutput1);
- executor.execute(() -> callback.complete(new JunkResponse(), null));
+ RaftClientMessages.UserRequest req2 =
+ client.factory().createUserRequest().setGroupId(groupId).setRequest(new TestInput2()).build();
- return null;
- }
- }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(JunkRequest.class), any(), any(), anyLong());
- }
+ assertTrue(client.sendUserRequest(req2).get().response() instanceof TestOutput2);
- private void mockLeaderRequest(boolean timeout) {
- Mockito.doAnswer(new Answer() {
- @Override public Object answer(InvocationOnMock invocation) throws Throwable {
- InvokeCallback callback = invocation.getArgument(2);
- Executor executor = invocation.getArgument(3);
-
- executor.execute(() -> {
- if (timeout)
- callback.complete(null, new TimeoutException());
- else
- callback.complete(INSTANCE.createGetLeaderResponse().setLeaderId(leader).build(), null);
- });
-
- return null;
- }
- }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(GetLeaderRequest.class), any(), any(), anyLong());
+ // Expecting raft group state to be transparently loaded on first request.
+ assertEquals(LEADER, client.state(groupId).leader());
}
}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestServiceTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestServiceTest.java
index 56e3ff8..1c802a0 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestServiceTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestServiceTest.java
@@ -1,51 +1,43 @@
package org.apache.ignite.raft.client.service;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import org.apache.ignite.raft.PeerId;
-import org.apache.ignite.raft.client.RaftClientMessages;
-import org.apache.ignite.raft.client.RaftClientMessages.UserRequest;
+import org.apache.ignite.network.NetworkCluster;
+import org.apache.ignite.raft.client.MockUtils.TestInput1;
+import org.apache.ignite.raft.client.MockUtils.TestInput2;
+import org.apache.ignite.raft.client.MockUtils.TestOutput1;
+import org.apache.ignite.raft.client.MockUtils.TestOutput2;
import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
import org.apache.ignite.raft.client.rpc.impl.RaftGroupRpcClientImpl;
import org.apache.ignite.raft.client.service.impl.RaftGroupClientRequestServiceImpl;
-import org.apache.ignite.raft.rpc.InvokeCallback;
-import org.apache.ignite.raft.rpc.NodeImpl;
-import org.apache.ignite.raft.rpc.RpcClient;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.stubbing.Answer;
import static java.util.Collections.singleton;
-import static org.apache.ignite.raft.client.message.RaftClientMessageBuilderFactory.INSTANCE;
+import static org.apache.ignite.raft.client.MockUtils.LEADER;
+import static org.apache.ignite.raft.client.MockUtils.mockLeaderRequest;
+import static org.apache.ignite.raft.client.MockUtils.mockUserInput1;
+import static org.apache.ignite.raft.client.MockUtils.mockUserInput2;
+import static org.apache.ignite.raft.client.message.RaftClientMessageFactoryImpl.INSTANCE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.ArgumentMatchers.eq;
@ExtendWith(MockitoExtension.class)
public class RaftGroupClientRequestServiceTest {
@Mock
- private RpcClient rpcClient;
-
- private static PeerId leader = new PeerId(new NodeImpl("test"));
+ private NetworkCluster cluster;
@Test
public void testUserRequest() throws Exception {
String groupId = "test";
- mockLeaderRequest();
- mockUserRequest1();
- mockUserRequest2();
+ mockLeaderRequest(cluster, false);
+ mockUserInput1(cluster);
+ mockUserInput2(cluster);
- RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, INSTANCE, 5_000, singleton(leader.getNode()));
+ RaftGroupRpcClient client = new RaftGroupRpcClientImpl(cluster, INSTANCE, 5_000, singleton(LEADER.getNode()));
RaftGroupClientRequestService service = new RaftGroupClientRequestServiceImpl(client, groupId);
@@ -63,70 +55,6 @@ public class RaftGroupClientRequestServiceTest {
assertNotNull(output2);
- assertEquals(leader, client.state(groupId).leader());
- }
-
- private static class TestInput1 {
- }
-
- private static class TestOutput1 {
- }
-
- private static class TestInput2 {
- }
-
- private static class TestOutput2 {
- }
-
- private void mockLeaderRequest() {
- Mockito.doAnswer(new Answer() {
- @Override public Object answer(InvocationOnMock invocation) throws Throwable {
- InvokeCallback callback = invocation.getArgument(2);
- Executor executor = invocation.getArgument(3);
-
- executor.execute(() -> {
- callback.complete(INSTANCE.createGetLeaderResponse().setLeaderId(leader).build(), null);
- });
-
- return null;
- }
- }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(RaftClientMessages.GetLeaderRequest.class),
- any(), any(), anyLong());
- }
-
- private void mockUserRequest1() {
- Mockito.doAnswer(new Answer() {
- @Override public Object answer(InvocationOnMock invocation) throws Throwable {
- InvokeCallback callback = invocation.getArgument(2);
- Executor executor = invocation.getArgument(3);
-
- executor.execute(() -> callback.complete(INSTANCE.createUserResponse().
- setResponse(new TestOutput1()).build(), null));
-
- return null;
- }
- }).when(rpcClient).invokeAsync(eq(leader.getNode()), argThat(new ArgumentMatcher<UserRequest>() {
- @Override public boolean matches(UserRequest argument) {
- return argument.request() instanceof TestInput1;
- }
- }), any(), any(), anyLong());
- }
-
- private void mockUserRequest2() {
- Mockito.doAnswer(new Answer() {
- @Override public Object answer(InvocationOnMock invocation) throws Throwable {
- InvokeCallback callback = invocation.getArgument(2);
- Executor executor = invocation.getArgument(3);
-
- executor.execute(() -> callback.complete(INSTANCE.createUserResponse().
- setResponse(new TestOutput2()).build(), null));
-
- return null;
- }
- }).when(rpcClient).invokeAsync(eq(leader.getNode()), argThat(new ArgumentMatcher<UserRequest>() {
- @Override public boolean matches(UserRequest argument) {
- return argument.request() instanceof TestInput2;
- }
- }), any(), any(), anyLong());
+ assertEquals(LEADER, client.state(groupId).leader());
}
}