You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/27 11:23:47 UTC
[ignite-3] 01/04: IGNITE-14149 Cleanup.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch ignite-14149
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 092bc39b09739a5171b11316cec0c96615dfb13b
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Sat Feb 27 12:32:06 2021 +0300
IGNITE-14149 Cleanup.
---
.../main/java/org/apache/ignite/raft/PeerId.java | 4 +
.../main/java/org/apache/ignite/raft/State.java | 265 +--------------------
.../raft/client/RaftClientCommonMessages.java | 2 +-
.../RaftClientCommonMessageBuilderFactory.java | 2 -
.../ignite/raft/client/rpc/RaftGroupRpcClient.java | 6 +-
.../client/rpc/impl/RaftGroupRpcClientImpl.java | 117 +++++----
.../service/RaftGroupClientRequestListener.java | 2 +-
.../service/RaftGroupClientRequestService.java | 7 +
.../impl/RaftGroupManagementServiceImpl.java | 8 +-
.../java/org/apache/ignite/raft/rpc/NodeImpl.java | 2 +-
.../apache/ignite/raft/rpc/RaftGroupMessage.java | 3 +
.../java/org/apache/ignite/raft/rpc/RpcClient.java | 8 +-
.../ignite/raft/client/RaftGroupRpcClientTest.java | 116 ++++++++-
13 files changed, 212 insertions(+), 330 deletions(-)
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
index 56d96be..7b5fec0 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/PeerId.java
@@ -75,4 +75,8 @@ public class PeerId implements Serializable {
result = 31 * result + priority;
return result;
}
+
+ @Override public String toString() {
+ return node.id() + ":" + priority;
+ }
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/State.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/State.java
index 5f5fb15..5001d74 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/State.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/State.java
@@ -16,271 +16,16 @@
*/
package org.apache.ignite.raft;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
-import java.util.Set;
+import org.jetbrains.annotations.Nullable;
/**
* Raft group state.
*/
-public class State implements Iterable<PeerId> {
- /** Mark a leaner peer */
- private static final String LEARNER_POSTFIX = "/learner";
+public interface State {
+ @Nullable PeerId leader();
- private volatile PeerId leader;
+ @Nullable List<PeerId> peers();
- private List<PeerId> peers = new ArrayList<>();
-
- // use LinkedHashSet to keep insertion order.
- private List<PeerId> learners = new ArrayList<>();
-
- public State() {
- super();
- }
-
- /**
- * Construct a configuration instance with peers.
- *
- * @param conf configuration
- */
- public State(final Iterable<PeerId> conf) {
- this(conf, null);
- }
-
- /**
- * Construct a configuration from another conf.
- *
- * @param conf configuration
- */
- public State(final State conf) {
- this(conf.getPeers(), conf.getLearners());
- }
-
- /**
- * Construct a Configuration instance with peers and learners.
- *
- * @param conf peers configuration
- * @param learners learners
- * @since 1.3.0
- */
- public State(final Iterable<PeerId> conf, final Iterable<PeerId> learners) {
- for (final PeerId peer : conf) {
- this.peers.add(new PeerId(peer));
- }
- addLearners(learners);
- }
-
- public PeerId getLeader() {
- return leader;
- }
-
- public void setLeader(PeerId leader) {
- this.leader = leader;
- }
-
- public void setLearners(final List<PeerId> learners) {
- this.learners = learners;
- }
-
- /**
- * Add a learner peer.
- *
- * @param learner learner to add
- * @return true when add successfully.
- */
- public boolean addLearner(final PeerId learner) {
- return this.learners.add(learner);
- }
-
- /**
- * Add learners in batch, returns the added count.
- *
- * @param learners learners to add
- * @return the total added count
- */
- public int addLearners(final Iterable<PeerId> learners) {
- int ret = 0;
- if (learners != null) {
- for (final PeerId peer : learners) {
- if (this.learners.add(new PeerId(peer))) {
- ret++;
- }
- }
- }
- return ret;
- }
-
- /**
- * Remove a learner peer.
- *
- * @param learner learner to remove
- * @return true when remove successfully.
- */
- public boolean removeLearner(final PeerId learner) {
- return this.learners.remove(learner);
- }
-
- /**
- * Retrieve the learners set.
- *
- * @return learners
- */
- public List<PeerId> getLearners() {
- return this.learners;
- }
-
- /**
- * Retrieve the learners set copy.
- *
- * @return learners
- */
- public List<PeerId> listLearners() {
- return new ArrayList<>(this.learners);
- }
-
- /**
- * Returns true when the configuration is valid.
- *
- * @return true if the configuration is valid.
- */
- public boolean isValid() {
- final Set<PeerId> intersection = new HashSet<>(this.peers);
- intersection.retainAll(this.learners);
- return !this.peers.isEmpty() && intersection.isEmpty();
- }
-
- public void reset() {
- this.peers.clear();
- this.learners.clear();
- }
-
- public boolean isEmpty() {
- return this.peers.isEmpty();
- }
-
- /**
- * Returns the peers total number.
- *
- * @return total num of peers
- */
- public int size() {
- return this.peers.size();
- }
-
- @Override
- public Iterator<PeerId> iterator() {
- return this.peers.iterator();
- }
-
- public Set<PeerId> getPeerSet() {
- return new HashSet<>(this.peers);
- }
-
- public List<PeerId> listPeers() {
- return new ArrayList<>(this.peers);
- }
-
- public List<PeerId> getPeers() {
- return this.peers;
- }
-
- public void setPeers(final List<PeerId> peers) {
- this.peers.clear();
- for (final PeerId peer : peers) {
- this.peers.add(new PeerId(peer));
- }
- }
-
- public void appendPeers(final Collection<PeerId> set) {
- this.peers.addAll(set);
- }
-
- public boolean addPeer(final PeerId peer) {
- return this.peers.add(peer);
- }
-
- public boolean removePeer(final PeerId peer) {
- return this.peers.remove(peer);
- }
-
- public boolean contains(final PeerId peer) {
- return this.peers.contains(peer);
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((this.learners == null) ? 0 : this.learners.hashCode());
- result = prime * result + ((this.peers == null) ? 0 : this.peers.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- State other = (State) obj;
- if (this.learners == null) {
- if (other.learners != null) {
- return false;
- }
- } else if (!this.learners.equals(other.learners)) {
- return false;
- }
- if (this.peers == null) {
- return other.peers == null;
- } else {
- return this.peers.equals(other.peers);
- }
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- final List<PeerId> peers = listPeers();
- int i = 0;
- int size = peers.size();
- for (final PeerId peer : peers) {
- sb.append(peer);
- if (i < size - 1 || !this.learners.isEmpty()) {
- sb.append(",");
- }
- i++;
- }
-
- size = this.learners.size();
- i = 0;
- for (final PeerId peer : this.learners) {
- sb.append(peer).append(LEARNER_POSTFIX);
- if (i < size - 1) {
- sb.append(",");
- }
- i++;
- }
-
- return sb.toString();
- }
-
- /**
- * Get the difference between |*this| and |rhs|
- * |included| would be assigned to |*this| - |rhs|
- * |excluded| would be assigned to |rhs| - |*this|
- */
- public void diff(final State rhs, final State included, final State excluded) {
- included.peers = new ArrayList<>(this.peers);
- included.peers.removeAll(rhs.peers);
- excluded.peers = new ArrayList<>(rhs.peers);
- excluded.peers.removeAll(this.peers);
- }
+ @Nullable List<PeerId> learners();
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientCommonMessages.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientCommonMessages.java
index 30b1cb5..5a40dc6 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientCommonMessages.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientCommonMessages.java
@@ -257,7 +257,7 @@ public final class RaftClientCommonMessages {
}
}
- public interface UserRequest<T> extends Message {
+ public interface UserRequest<T> extends RaftGroupMessage {
T request();
public interface Builder<T> {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientCommonMessageBuilderFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientCommonMessageBuilderFactory.java
index 45f070a..440cee2 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientCommonMessageBuilderFactory.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientCommonMessageBuilderFactory.java
@@ -6,8 +6,6 @@ import org.apache.ignite.raft.client.RaftClientCommonMessages;
* Raft client message builders factory.
*/
public class RaftClientCommonMessageBuilderFactory implements ClientMessageBuilderFactory {
- public static RaftClientCommonMessageBuilderFactory DEFAULT = new RaftClientCommonMessageBuilderFactory();
-
@Override public RaftClientCommonMessages.PingRequest.Builder createPingRequest() {
return new PingRequestImpl();
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
index 986fd71..5f657eb 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
@@ -46,12 +46,12 @@ import static org.apache.ignite.raft.client.RaftClientCommonMessages.TransferLea
public interface RaftGroupRpcClient {
/**
* @param groupId Group id.
- * @return Current group state or null if state is not yet initalized.
+ * @return Group state snapshot.
*/
- @Nullable State state(String groupId);
+ State state(String groupId);
/**
- * Refreshes a state of initialized group.
+ * Refreshes a group leader.
* @param groupId Group id.
* @return A future.
*/
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
index d7dc4c8..f0ceda6 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
@@ -1,6 +1,7 @@
package org.apache.ignite.raft.client.rpc.impl;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -8,20 +9,25 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
-import java.util.function.Function;
import org.apache.ignite.raft.PeerId;
import org.apache.ignite.raft.RaftException;
import org.apache.ignite.raft.State;
import org.apache.ignite.raft.client.RaftClientCommonMessages;
+import org.apache.ignite.raft.client.RaftClientCommonMessages.GetLeaderResponse;
import org.apache.ignite.raft.client.RaftClientCommonMessages.StatusResponse;
+import org.apache.ignite.raft.client.message.ClientMessageBuilderFactory;
import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
-import org.apache.ignite.raft.client.message.RaftClientCommonMessageBuilderFactory;
import org.apache.ignite.raft.rpc.InvokeCallback;
import org.apache.ignite.raft.rpc.Message;
import org.apache.ignite.raft.rpc.Node;
import org.apache.ignite.raft.rpc.RaftGroupMessage;
import org.apache.ignite.raft.rpc.RpcClient;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.raft.client.message.RaftClientCommonMessageBuilderFactory.DEFAULT;
public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
private final ExecutorService executor;
@@ -31,7 +37,10 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
/** Where to ask for initial configuration. */
private final Set<Node> initialCfgNodes;
- private Map<String, State> states = new ConcurrentHashMap<>();
+ /** */
+ private final ClientMessageBuilderFactory factory;
+
+ private Map<String, StateImpl> states = new ConcurrentHashMap<>();
/**
* Accepts dependencies in constructor.
@@ -39,17 +48,22 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
* @param defaultTimeout
* @param initialCfgNode Initial configuration nodes.
*/
- public RaftGroupRpcClientImpl(RpcClient rpcClient, int defaultTimeout, Set<Node> initialCfgNodes) {
+ public RaftGroupRpcClientImpl(RpcClient rpcClient, ClientMessageBuilderFactory factory, int defaultTimeout, Set<Node> initialCfgNodes) {
this.defaultTimeout = defaultTimeout;
this.rpcClient = rpcClient;
+ this.factory = factory;
this.initialCfgNodes = new HashSet<>(initialCfgNodes);
executor = Executors.newWorkStealingPool();
}
@Override public State state(String groupId) {
- State newState = new State();
+ return getState(groupId);
+ }
+
+ private StateImpl getState(String groupId) {
+ StateImpl newState = new StateImpl();
- State state = states.putIfAbsent(groupId, newState);
+ StateImpl state = states.putIfAbsent(groupId, newState);
if (state == null)
state = newState;
@@ -58,28 +72,16 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
}
@Override public CompletableFuture<PeerId> refreshLeader(String groupId) {
- State state = state(groupId);
+ StateImpl state = getState(groupId);
- if (state.getLeader() == null) {
- synchronized (state) {
- PeerId leader = state.getLeader();
+ return refreshLeader(initialCfgNodes.iterator().next(), groupId).
+ thenApply(resp -> {
+ PeerId leaderId = resp.getLeaderId();
- if (leader == null) {
- return refreshLeader(initialCfgNodes.iterator().next(), groupId).
- thenApply(new Function<RaftClientCommonMessages.GetLeaderResponse, PeerId>() {
- @Override public PeerId apply(RaftClientCommonMessages.GetLeaderResponse getLeaderResponse) {
- PeerId leaderId = getLeaderResponse.getLeaderId();
+ state.leader = leaderId;
- state.setLeader(leaderId);
-
- return leaderId;
- }
- });
- }
- }
- }
-
- return CompletableFuture.completedFuture(state.getLeader());
+ return leaderId;
+ });
}
@Override public CompletableFuture<RaftClientCommonMessages.AddPeerResponse> addPeer(RaftClientCommonMessages.AddPeerRequest request) {
@@ -118,21 +120,34 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
return null;
}
- private CompletableFuture<RaftClientCommonMessages.GetLeaderResponse> refreshLeader(Node node, String groupId) {
- RaftClientCommonMessages.GetLeaderRequest req = RaftClientCommonMessageBuilderFactory.DEFAULT.createGetLeaderRequest().setGroupId(groupId).build();
+ private CompletableFuture<GetLeaderResponse> refreshLeader(Node node, String groupId) {
+ StateImpl state = getState(groupId);
- CompletableFuture<RaftClientCommonMessages.GetLeaderResponse> fut = new CompletableFuture<>();
+ while(true) {
+ CompletableFuture<GetLeaderResponse> fut = state.updateFutRef.get();
- rpcClient.invokeAsync(node, req, new InvokeCallback<RaftClientCommonMessages.GetLeaderResponse>() {
- @Override public void complete(RaftClientCommonMessages.GetLeaderResponse response, Throwable err) {
- if (err != null)
- fut.completeExceptionally(err);
- else
- fut.complete(response);
- }
- }, executor, defaultTimeout);
+ if (fut != null)
+ return fut;
- return fut;
+ if (state.updateFutRef.compareAndSet(null, (fut = new CompletableFuture<>()))) {
+ RaftClientCommonMessages.GetLeaderRequest req = DEFAULT.createGetLeaderRequest().setGroupId(groupId).build();
+
+ CompletableFuture<GetLeaderResponse> finalFut = fut;
+
+ rpcClient.invokeAsync(node, req, new InvokeCallback<GetLeaderResponse>() {
+ @Override public void complete(GetLeaderResponse response, Throwable err) {
+ if (err != null)
+ finalFut.completeExceptionally(err);
+ else
+ finalFut.complete(response);
+
+ state.updateFutRef.set(null);
+ }
+ }, executor, defaultTimeout);
+
+ return fut;
+ }
+ }
}
@Override public <R extends Message> CompletableFuture<R> sendCustom(RaftGroupMessage request) {
@@ -142,13 +157,13 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
fut.orTimeout(defaultTimeout, TimeUnit.MILLISECONDS);
- CompletableFuture<PeerId> fut0 = refreshLeader(request.getGroupId());
+ CompletableFuture<PeerId> fut0 = state.leader() == null ?
+ refreshLeader(request.getGroupId()) : completedFuture(state.leader());
- // TODO implement clean chaining.
fut0.whenComplete(new BiConsumer<PeerId, Throwable>() {
@Override public void accept(PeerId peerId, Throwable error) {
if (error == null) {
- rpcClient.invokeAsync(state.getLeader().getNode(), request, new InvokeCallback<R>() {
+ rpcClient.invokeAsync(peerId.getNode(), request, new InvokeCallback<R>() {
@Override public void complete(R response, Throwable err) {
if (err != null)
fut.completeExceptionally(err);
@@ -175,4 +190,26 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
return fut;
}
+
+ private static class StateImpl implements State {
+ private volatile PeerId leader;
+
+ private volatile List<PeerId> peers;
+
+ private volatile List<PeerId> learners;
+
+ private AtomicReference<CompletableFuture<GetLeaderResponse>> updateFutRef = new AtomicReference<>();
+
+ @Override public @Nullable PeerId leader() {
+ return leader;
+ }
+
+ @Override public @Nullable List<PeerId> peers() {
+ return peers;
+ }
+
+ @Override public @Nullable List<PeerId> learners() {
+ return learners;
+ }
+ }
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestListener.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestListener.java
index 3649a41..c007086 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestListener.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestListener.java
@@ -3,7 +3,7 @@ package org.apache.ignite.raft.client.service;
import java.util.Iterator;
/**
- * A listener for raft group clien requests.
+ * A listener for raft group client requests.
*/
public interface RaftGroupClientRequestListener {
void onReads(Iterator iterator);
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java
index 9fdc19e..a771bc9 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupClientRequestService.java
@@ -6,5 +6,12 @@ import java.util.concurrent.CompletableFuture;
*
*/
public interface RaftGroupClientRequestService {
+ /**
+ * Submits a custom request to a raft group leader. If a leader is not known yet, will try to resolve the leader.
+ * @param request
+ * @param <T> Request.
+ * @param <R> Response.
+ * @return A future.
+ */
<T, R> CompletableFuture<R> submit(T request);
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java
index ff3749f..a70c1fb 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupManagementServiceImpl.java
@@ -3,10 +3,8 @@ package org.apache.ignite.raft.client.service.impl;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.raft.PeerId;
-import org.apache.ignite.raft.State;
import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
import org.apache.ignite.raft.client.service.RaftGroupManagmentService;
-import org.apache.ignite.raft.rpc.Node;
import org.jetbrains.annotations.Nullable;
public class RaftGroupManagementServiceImpl implements RaftGroupManagmentService {
@@ -17,15 +15,15 @@ public class RaftGroupManagementServiceImpl implements RaftGroupManagmentService
}
@Override public @Nullable PeerId getLeader(String groupId) {
- return rpcClient.state(groupId).getLeader();
+ return rpcClient.state(groupId).leader();
}
@Override public @Nullable List<PeerId> getPeers(String groupId) {
- return rpcClient.state(groupId).getPeers();
+ return rpcClient.state(groupId).peers();
}
@Override public @Nullable List<PeerId> getLearners(String groupId) {
- return rpcClient.state(groupId).getLearners();
+ return rpcClient.state(groupId).learners();
}
@Override public CompletableFuture<PeersChangeState> addPeer(PeerId peerId) {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
index f71424f..ea6f10c 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
@@ -29,7 +29,7 @@ public class NodeImpl implements Node {
}
@Override public String id() {
- return null;
+ return id;
}
@Override public boolean equals(Object o) {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java
index 0ee4561..97502ce 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RaftGroupMessage.java
@@ -1,5 +1,8 @@
package org.apache.ignite.raft.rpc;
+/**
+ * A message for a specific raft group.
+ */
public interface RaftGroupMessage extends Message {
public String getGroupId();
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java
index cdb6db7..4231dae 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcClient.java
@@ -28,7 +28,7 @@ public interface RpcClient {
* @param node target address
* @return true if there is a connection and the connection is active and writable.
*/
- boolean checkConnection(final NodeImpl node);
+ boolean checkConnection(final Node node);
/**
* Check connection for given address and async to create a new one if there is no connection.
@@ -37,14 +37,14 @@ public interface RpcClient {
* @param createIfAbsent create a new one if there is no connection
* @return true if there is a connection and the connection is active and writable.
*/
- boolean checkConnection(final NodeImpl node, final boolean createIfAbsent);
+ boolean checkConnection(final Node node, final boolean createIfAbsent);
/**
* Close all connections of a address.
*
* @param node target address
*/
- void closeConnection(final NodeImpl node);
+ void closeConnection(final Node node);
/**
* Asynchronous invocation with a callback.
@@ -53,7 +53,7 @@ public interface RpcClient {
* @param request Request object
* @param callback Invoke callback.
* @param executor Executor to run invoke callback.
- * @param timeoutMs Timeout millisecond
+ * @param timeoutMs Timeout millisecond.
*/
void invokeAsync(final Node node, final Message request, InvokeCallback callback, Executor executor, final long timeoutMs);
}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
index e7012e2..d755e86 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
@@ -1,10 +1,14 @@
package org.apache.ignite.raft.client;
import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.raft.PeerId;
import org.apache.ignite.raft.State;
+import org.apache.ignite.raft.client.RaftClientCommonMessages.GetLeaderRequest;
import org.apache.ignite.raft.client.rpc.impl.RaftGroupRpcClientImpl;
import org.apache.ignite.raft.client.message.RaftClientCommonMessageBuilderFactory;
import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
@@ -15,6 +19,7 @@ import org.apache.ignite.raft.rpc.RaftGroupMessage;
import org.apache.ignite.raft.rpc.RpcClient;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.parallel.Execution;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@@ -23,6 +28,7 @@ import org.mockito.stubbing.Answer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
@@ -35,25 +41,104 @@ public class RaftGroupRpcClientTest {
private static PeerId leader = new PeerId(new NodeImpl("test"));
@Test
+ public void testRefreshLeader() throws Exception {
+ String groupId = "test";
+
+ mockLeaderRequest(false);
+
+ RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, new RaftClientCommonMessageBuilderFactory(),
+ 5_000, Collections.singleton(leader.getNode()));
+
+ PeerId leaderId = client.refreshLeader(groupId).get();
+
+ assertEquals(leader, client.state(groupId).leader());
+ assertEquals(leader, leaderId);
+ }
+
+ @Test
+ public void testRefreshLeaderMultithreaded() throws Exception {
+ String groupId = "test";
+
+ mockLeaderRequest(false);
+
+ RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, new RaftClientCommonMessageBuilderFactory(),
+ 5_000, Collections.singleton(leader.getNode()));
+
+ int cnt = 20;
+
+ Thread[] runners = new Thread[cnt];
+
+ CountDownLatch l = new CountDownLatch(cnt);
+
+ for (int i = 0; i < runners.length; i++) {
+ runners[i] = new Thread(new Runnable() {
+ @Override public void run() {
+ l.countDown();
+ try {
+ l.await();
+ }
+ catch (InterruptedException e) {
+ // Ignored.
+ }
+
+ try {
+ PeerId leaderId = client.refreshLeader(groupId).get();
+
+ assertEquals(leader, client.state(groupId).leader());
+ assertEquals(leader, leaderId);
+ }
+ catch (Exception e) {
+ fail(e);
+ }
+ }
+ });
+ runners[i].setName("Executor-" + i);
+ runners[i].start();
+ }
+
+ for (int i = 0; i < runners.length; i++)
+ runners[i].join();
+ }
+
+ @Test
+ public void testRefreshLeaderTimeout() throws Exception {
+ String groupId = "test";
+
+ mockLeaderRequest(true);
+
+ RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, new RaftClientCommonMessageBuilderFactory(),
+ 5_000, Collections.singleton(leader.getNode()));
+
+ try {
+ client.refreshLeader(groupId).get();
+
+ fail();
+ }
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof TimeoutException);
+ }
+ }
+
+ @Test
public void testCustomMessage() throws Exception {
String groupId = "test";
- mockClient();
+ mockLeaderRequest(false);
+ mockCustomRequest();
- // TODO FIXME asch where to get initial configuration for the group ?
- RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, 5_000, Collections.singleton(leader.getNode()));
+ RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, new RaftClientCommonMessageBuilderFactory(),
+ 5_000, Collections.singleton(leader.getNode()));
JunkRequest req = new JunkRequest(groupId);
- Future<Message> fut = client.sendCustom(req);
+ Message resp = client.sendCustom(req).get();
State state = client.state(groupId);
// Expecting raft group state to be transparently loaded on first request.
- // TODO FIXME broken
- assertEquals(leader, state.getLeader());
+ assertEquals(leader, state.leader());
- assertTrue(fut.get() instanceof JunkResponse);
+ assertTrue(resp instanceof JunkResponse);
}
private static class JunkRequest implements RaftGroupMessage {
@@ -70,8 +155,7 @@ public class RaftGroupRpcClientTest {
private static class JunkResponse implements Message {}
- private void mockClient() {
- // Mock junk request.
+ private void mockCustomRequest() {
Mockito.doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock invocation) throws Throwable {
InvokeCallback callback = invocation.getArgument(2);
@@ -82,17 +166,23 @@ public class RaftGroupRpcClientTest {
return null;
}
}).when(rpcClient).invokeAsync(eq(leader.getNode()), any(JunkRequest.class), any(), any(), anyLong());
+ }
- // Mock get leader request.
+ private void mockLeaderRequest(boolean timeout) {
Mockito.doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock invocation) throws Throwable {
InvokeCallback callback = invocation.getArgument(2);
Executor executor = invocation.getArgument(3);
- executor.execute(() -> callback.complete(RaftClientCommonMessageBuilderFactory.DEFAULT.createGetLeaderResponse().setLeaderId(leader).build(), null));
+ executor.execute(() -> {
+ if (timeout)
+ callback.complete(null, new TimeoutException());
+ else
+ callback.complete(RaftClientCommonMessageBuilderFactory.DEFAULT.createGetLeaderResponse().setLeaderId(leader).build(), null);
+ });
return null;
}
- }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(RaftClientCommonMessages.GetLeaderRequest.class), any(), any(), anyLong());
+ }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(GetLeaderRequest.class), any(), any(), anyLong());
}
}