You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/16 18:02:29 UTC
[ignite-3] 04/04: IGNITE-14149 wip 2.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch ignite-14149
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit f9d1a7b6873da7ffb6edcac1af649a508db35c67
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Tue Feb 16 21:02:07 2021 +0300
IGNITE-14149 wip 2.
---
modules/raft-client/pom.xml | 5 +
.../main/java/org/apache/ignite/raft/State.java | 2 +-
.../raft/client/RaftClientCommonMessages.java | 4 +-
.../ignite/raft/client/RaftGroupRpcClient.java | 1 -
.../raft/client/impl/RaftGroupRpcClientImpl.java | 144 +++++++++++++++++++++
.../message/CreateGetLeaderResponseImpl.java | 22 ----
...rRequestImpl.java => GetLeaderRequestImpl.java} | 3 +-
.../raft/client/message/GetLeaderResponseImpl.java | 23 ++++
.../RaftClientCommonMessageBuilderFactory.java | 4 +-
.../org/apache/ignite/raft/rpc/InvokeCallback.java | 4 +-
.../main/java/org/apache/ignite/raft/rpc/Node.java | 3 +
.../java/org/apache/ignite/raft/rpc/NodeImpl.java | 1 +
.../ignite/raft/client/RaftGroupRpcClientTest.java | 92 +++++++++++--
13 files changed, 268 insertions(+), 40 deletions(-)
diff --git a/modules/raft-client/pom.xml b/modules/raft-client/pom.xml
index d2613d0..05f2489 100644
--- a/modules/raft-client/pom.xml
+++ b/modules/raft-client/pom.xml
@@ -42,6 +42,11 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/State.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/State.java
index d924847..5f5fb15 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/State.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/State.java
@@ -30,7 +30,7 @@ public class State implements Iterable<PeerId> {
/** Mark a leaner peer */
private static final String LEARNER_POSTFIX = "/learner";
- private PeerId leader;
+ private volatile PeerId leader;
private List<PeerId> peers = new ArrayList<>();
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientCommonMessages.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientCommonMessages.java
index 150fe7d..39b9b90 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientCommonMessages.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientCommonMessages.java
@@ -172,12 +172,12 @@ public final class RaftClientCommonMessages {
}
public interface GetLeaderResponse extends Message {
- String getLeaderId();
+ PeerId getLeaderId();
public interface Builder {
GetLeaderResponse build();
- Builder setLeaderId(String leaderId);
+ Builder setLeaderId(PeerId leaderId);
}
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftGroupRpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftGroupRpcClient.java
index f0910d0..bccfded 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftGroupRpcClient.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftGroupRpcClient.java
@@ -61,7 +61,6 @@ public interface RaftGroupRpcClient {
*
* @param endpoint server address
* @param request request data
- * @param done callback
* @return a future with result
*/
Future<RemovePeerResponse> removePeer(RemovePeerRequest request);
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/impl/RaftGroupRpcClientImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/impl/RaftGroupRpcClientImpl.java
new file mode 100644
index 0000000..affa939
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/impl/RaftGroupRpcClientImpl.java
@@ -0,0 +1,144 @@
+package org.apache.ignite.raft.client.impl;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.raft.PeerId;
+import org.apache.ignite.raft.State;
+import org.apache.ignite.raft.client.RaftClientCommonMessages;
+import org.apache.ignite.raft.client.RaftGroupRpcClient;
+import org.apache.ignite.raft.client.message.RaftClientCommonMessageBuilderFactory;
+import org.apache.ignite.raft.rpc.InvokeCallback;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.Node;
+import org.apache.ignite.raft.rpc.RaftGroupMessage;
+import org.apache.ignite.raft.rpc.RpcClient;
+
+public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
+ private final ExecutorService executor;
+ private final int defaultTimeout;
+ private final RpcClient rpcClient;
+
+ /** Where to ask for initial configuration. */
+ private final Set<Node> initialCfgNodes;
+
+ private Map<String, State> states = new ConcurrentHashMap<>();
+
+ /**
+ * Accepts dependencies in constructor.
+ * @param rpcClient
+ * @param defaultTimeout
+ * @param initialCfgNode Initial configuration nodes.
+ */
+ public RaftGroupRpcClientImpl(RpcClient rpcClient, int defaultTimeout, Set<Node> initialCfgNodes) {
+ this.defaultTimeout = defaultTimeout;
+ this.rpcClient = rpcClient;
+ this.initialCfgNodes = new HashSet<>(initialCfgNodes);
+ executor = Executors.newWorkStealingPool();
+ }
+
+ @Override public State state(String groupId, boolean refresh) {
+ return states.get(groupId);
+ }
+
+ @Override public Future<RaftClientCommonMessages.AddPeerResponse> addPeer(RaftClientCommonMessages.AddPeerRequest request) {
+ return null;
+ }
+
+ @Override public Future<RaftClientCommonMessages.RemovePeerResponse> removePeer(RaftClientCommonMessages.RemovePeerRequest request) {
+ return null;
+ }
+
+ @Override public Future<RaftClientCommonMessages.StatusResponse> resetPeers(PeerId peerId, RaftClientCommonMessages.ResetPeerRequest request) {
+ return null;
+ }
+
+ @Override public Future<RaftClientCommonMessages.StatusResponse> snapshot(PeerId peerId, RaftClientCommonMessages.SnapshotRequest request) {
+ return null;
+ }
+
+ @Override public Future<RaftClientCommonMessages.ChangePeersResponse> changePeers(RaftClientCommonMessages.ChangePeersRequest request) {
+ return null;
+ }
+
+ @Override public Future<RaftClientCommonMessages.LearnersOpResponse> addLearners(RaftClientCommonMessages.AddLearnersRequest request) {
+ return null;
+ }
+
+ @Override public Future<RaftClientCommonMessages.LearnersOpResponse> removeLearners(RaftClientCommonMessages.RemoveLearnersRequest request) {
+ return null;
+ }
+
+ @Override public Future<RaftClientCommonMessages.LearnersOpResponse> resetLearners(RaftClientCommonMessages.ResetLearnersRequest request) {
+ return null;
+ }
+
+ @Override public Future<RaftClientCommonMessages.StatusResponse> transferLeader(RaftClientCommonMessages.TransferLeaderRequest request) {
+ return null;
+ }
+
+ private CompletableFuture<RaftClientCommonMessages.GetLeaderResponse> refreshLeader(Node node, String groupId) {
+ RaftClientCommonMessages.GetLeaderRequest req = RaftClientCommonMessageBuilderFactory.DEFAULT.createGetLeaderRequest().setGroupId(groupId).build();
+
+ CompletableFuture<RaftClientCommonMessages.GetLeaderResponse> fut = new CompletableFuture<>();
+
+ rpcClient.invokeAsync(node, req, new InvokeCallback<RaftClientCommonMessages.GetLeaderResponse>() {
+ @Override public void complete(RaftClientCommonMessages.GetLeaderResponse response, Throwable err) {
+ if (err != null)
+ fut.completeExceptionally(err);
+ else
+ fut.complete(response);
+ }
+ }, executor, defaultTimeout);
+
+ return fut;
+ }
+
+ @Override public <R extends Message> Future<R> sendCustom(RaftGroupMessage request) {
+ State newState = new State();
+
+ State state = states.putIfAbsent(request.getGroupId(), newState);
+
+ if (state == null)
+ state = newState;
+
+ CompletableFuture<R> fut = new CompletableFuture<>();
+
+ fut.orTimeout(defaultTimeout, TimeUnit.MILLISECONDS);
+
+ if (state.getLeader() == null) {
+ synchronized (state) {
+ CompletableFuture<RaftClientCommonMessages.GetLeaderResponse> fut0 =
+ refreshLeader(initialCfgNodes.iterator().next(), request.getGroupId()); // TODO asch search all nodes.
+
+ try {
+ RaftClientCommonMessages.GetLeaderResponse resp = fut0.get(defaultTimeout, TimeUnit.MILLISECONDS);
+
+ state.setLeader(resp.getLeaderId());
+ }
+ catch (Exception e) {
+ fut.completeExceptionally(e);
+
+ return fut;
+ }
+ }
+ }
+
+ rpcClient.invokeAsync(state.getLeader().getNode(), request, new InvokeCallback<R>() {
+ @Override public void complete(R response, Throwable err) {
+ if (err != null)
+ fut.completeExceptionally(err);
+ else
+ fut.complete(response);
+ }
+ }, executor, defaultTimeout);
+
+ return fut;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/CreateGetLeaderResponseImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/CreateGetLeaderResponseImpl.java
deleted file mode 100644
index 5b58d48..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/CreateGetLeaderResponseImpl.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.ignite.raft.client.message;
-
-
-import org.apache.ignite.raft.client.RaftClientCommonMessages;
-
-public class CreateGetLeaderResponseImpl implements RaftClientCommonMessages.GetLeaderResponse, RaftClientCommonMessages.GetLeaderResponse.Builder {
- private String leaderId;
-
- @Override public String getLeaderId() {
- return leaderId;
- }
-
- @Override public RaftClientCommonMessages.GetLeaderResponse build() {
- return this;
- }
-
- @Override public Builder setLeaderId(String leaderId) {
- this.leaderId = leaderId;
-
- return this;
- }
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/CreateGetLeaderRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java
similarity index 70%
rename from modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/CreateGetLeaderRequestImpl.java
rename to modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java
index 9c44e11..429bab4 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/CreateGetLeaderRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java
@@ -1,10 +1,9 @@
package org.apache.ignite.raft.client.message;
-import org.apache.ignite.raft.PeerId;
import org.apache.ignite.raft.client.RaftClientCommonMessages;
-public class CreateGetLeaderRequestImpl implements RaftClientCommonMessages.GetLeaderRequest, RaftClientCommonMessages.GetLeaderRequest.Builder {
+public class GetLeaderRequestImpl implements RaftClientCommonMessages.GetLeaderRequest, RaftClientCommonMessages.GetLeaderRequest.Builder {
private String groupId;
@Override public String getGroupId() {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderResponseImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderResponseImpl.java
new file mode 100644
index 0000000..22789b1
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderResponseImpl.java
@@ -0,0 +1,23 @@
+package org.apache.ignite.raft.client.message;
+
+
+import org.apache.ignite.raft.PeerId;
+import org.apache.ignite.raft.client.RaftClientCommonMessages;
+
+public class GetLeaderResponseImpl implements RaftClientCommonMessages.GetLeaderResponse, RaftClientCommonMessages.GetLeaderResponse.Builder {
+ private PeerId leaderId;
+
+ @Override public PeerId getLeaderId() {
+ return leaderId;
+ }
+
+ @Override public RaftClientCommonMessages.GetLeaderResponse build() {
+ return this;
+ }
+
+ @Override public Builder setLeaderId(PeerId leaderId) {
+ this.leaderId = leaderId;
+
+ return this;
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientCommonMessageBuilderFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientCommonMessageBuilderFactory.java
index f54ec48..45f070a 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientCommonMessageBuilderFactory.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientCommonMessageBuilderFactory.java
@@ -53,11 +53,11 @@ public class RaftClientCommonMessageBuilderFactory implements ClientMessageBuild
}
@Override public RaftClientCommonMessages.GetLeaderRequest.Builder createGetLeaderRequest() {
- return new CreateGetLeaderRequestImpl();
+ return new GetLeaderRequestImpl();
}
@Override public RaftClientCommonMessages.GetLeaderResponse.Builder createGetLeaderResponse() {
- return new CreateGetLeaderResponseImpl();
+ return new GetLeaderResponseImpl();
}
@Override public RaftClientCommonMessages.GetPeersRequest.Builder createGetPeersRequest() {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java
index c58b5e2..743d519 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java
@@ -19,6 +19,6 @@ package org.apache.ignite.raft.rpc;
/**
*
*/
-public interface InvokeCallback {
- void complete(final Message response, final Throwable err);
+public interface InvokeCallback<T extends Message> {
+ void complete(final T response, final Throwable err);
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java
index bfff562..966ff52 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java
@@ -2,6 +2,9 @@ package org.apache.ignite.raft.rpc;
import java.io.Serializable;
+/**
+ * TODO FIXME asch must be elsewhere.
+ */
public interface Node extends Serializable {
String id();
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
index 63556ac..f71424f 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
@@ -18,6 +18,7 @@ package org.apache.ignite.raft.rpc;
/**
* An node with an immutable id.
+ * TODO FIXME asch must be elsewhere.
*/
public class NodeImpl implements Node {
private final String id;
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
index d5bd8e1..bf1656a 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
@@ -1,21 +1,97 @@
package org.apache.ignite.raft.client;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import org.apache.ignite.raft.PeerId;
+import org.apache.ignite.raft.State;
+import org.apache.ignite.raft.client.impl.RaftGroupRpcClientImpl;
+import org.apache.ignite.raft.client.message.GetLeaderResponseImpl;
+import org.apache.ignite.raft.client.message.RaftClientCommonMessageBuilderFactory;
+import org.apache.ignite.raft.rpc.InvokeCallback;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.NodeImpl;
+import org.apache.ignite.raft.rpc.RaftGroupMessage;
+import org.apache.ignite.raft.rpc.RpcClient;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.stubbing.Answer;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+
+@ExtendWith(MockitoExtension.class)
public class RaftGroupRpcClientTest {
- @BeforeEach
- public void beforeTest() {
+ @Mock
+ private RpcClient rpcClient;
+
+ private static PeerId leader = new PeerId(new NodeImpl("test"));
+
+ @Test
+ public void testCustomMessage() throws Exception {
+ String groupId = "test";
+
+ mockClient();
+
+ // TODO FIXME asch where to get initial configuration for the group ?
+ RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, 5_000, Collections.singleton(leader.getNode()));
+
+ JunkRequest req = new JunkRequest(groupId);
+
+ Future<Message> fut = client.sendCustom(req);
+
+ State state = client.state(groupId, false);
+ // Expecting raft group state to be transparently loaded on first request.
+ assertEquals(leader, state.getLeader());
+
+ assertTrue(fut.get() instanceof JunkResponse);
}
- @AfterEach
- public void afterTest() {
+ private static class JunkRequest implements RaftGroupMessage {
+ private final String groupId;
+
+ JunkRequest(String groupId) {
+ this.groupId = groupId;
+ }
+
+ @Override public String getGroupId() {
+ return groupId;
+ }
}
- @Test
- public void test() {
+ private static class JunkResponse implements Message {}
+
+ private void mockClient() {
+ // Mock junk request.
+ Mockito.doAnswer(new Answer() {
+ @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+ InvokeCallback callback = invocation.getArgument(2);
+ Executor executor = invocation.getArgument(3);
+
+ executor.execute(() -> callback.complete(new JunkResponse(), null));
+
+ return null;
+ }
+ }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(JunkRequest.class), any(), any(), anyLong());
+
+ // Mock get leader request.
+ Mockito.doAnswer(new Answer() {
+ @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+ InvokeCallback callback = invocation.getArgument(2);
+ Executor executor = invocation.getArgument(3);
+
+ executor.execute(() -> callback.complete(RaftClientCommonMessageBuilderFactory.DEFAULT.createGetLeaderResponse().setLeaderId(leader).build(), null));
+ return null;
+ }
+ }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(RaftClientCommonMessages.GetLeaderRequest.class), any(), any(), anyLong());
}
}