You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/16 18:02:29 UTC

[ignite-3] 04/04: IGNITE-14149 wip 2.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch ignite-14149
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit f9d1a7b6873da7ffb6edcac1af649a508db35c67
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Tue Feb 16 21:02:07 2021 +0300

    IGNITE-14149 wip 2.
---
 modules/raft-client/pom.xml                        |   5 +
 .../main/java/org/apache/ignite/raft/State.java    |   2 +-
 .../raft/client/RaftClientCommonMessages.java      |   4 +-
 .../ignite/raft/client/RaftGroupRpcClient.java     |   1 -
 .../raft/client/impl/RaftGroupRpcClientImpl.java   | 144 +++++++++++++++++++++
 .../message/CreateGetLeaderResponseImpl.java       |  22 ----
 ...rRequestImpl.java => GetLeaderRequestImpl.java} |   3 +-
 .../raft/client/message/GetLeaderResponseImpl.java |  23 ++++
 .../RaftClientCommonMessageBuilderFactory.java     |   4 +-
 .../org/apache/ignite/raft/rpc/InvokeCallback.java |   4 +-
 .../main/java/org/apache/ignite/raft/rpc/Node.java |   3 +
 .../java/org/apache/ignite/raft/rpc/NodeImpl.java  |   1 +
 .../ignite/raft/client/RaftGroupRpcClientTest.java |  92 +++++++++++--
 13 files changed, 268 insertions(+), 40 deletions(-)

diff --git a/modules/raft-client/pom.xml b/modules/raft-client/pom.xml
index d2613d0..05f2489 100644
--- a/modules/raft-client/pom.xml
+++ b/modules/raft-client/pom.xml
@@ -42,6 +42,11 @@
         </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/State.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/State.java
index d924847..5f5fb15 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/State.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/State.java
@@ -30,7 +30,7 @@ public class State implements Iterable<PeerId> {
     /** Mark a leaner peer */
     private static final String LEARNER_POSTFIX = "/learner";
 
-    private PeerId leader;
+    private volatile PeerId leader;
 
     private List<PeerId> peers = new ArrayList<>();
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientCommonMessages.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientCommonMessages.java
index 150fe7d..39b9b90 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientCommonMessages.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftClientCommonMessages.java
@@ -172,12 +172,12 @@ public final class RaftClientCommonMessages {
     }
 
     public interface GetLeaderResponse extends Message {
-        String getLeaderId();
+        PeerId getLeaderId();
 
         public interface Builder {
             GetLeaderResponse build();
 
-            Builder setLeaderId(String leaderId);
+            Builder setLeaderId(PeerId leaderId);
         }
     }
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftGroupRpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftGroupRpcClient.java
index f0910d0..bccfded 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftGroupRpcClient.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/RaftGroupRpcClient.java
@@ -61,7 +61,6 @@ public interface RaftGroupRpcClient {
      *
      * @param endpoint  server address
      * @param request   request data
-     * @param done      callback
      * @return a future with result
      */
     Future<RemovePeerResponse> removePeer(RemovePeerRequest request);
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/impl/RaftGroupRpcClientImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/impl/RaftGroupRpcClientImpl.java
new file mode 100644
index 0000000..affa939
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/impl/RaftGroupRpcClientImpl.java
@@ -0,0 +1,144 @@
+package org.apache.ignite.raft.client.impl;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.raft.PeerId;
+import org.apache.ignite.raft.State;
+import org.apache.ignite.raft.client.RaftClientCommonMessages;
+import org.apache.ignite.raft.client.RaftGroupRpcClient;
+import org.apache.ignite.raft.client.message.RaftClientCommonMessageBuilderFactory;
+import org.apache.ignite.raft.rpc.InvokeCallback;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.Node;
+import org.apache.ignite.raft.rpc.RaftGroupMessage;
+import org.apache.ignite.raft.rpc.RpcClient;
+
+public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
+    private final ExecutorService executor;
+    private final int defaultTimeout;
+    private final RpcClient rpcClient;
+
+    /** Where to ask for initial configuration. */
+    private final Set<Node> initialCfgNodes;
+
+    private Map<String, State> states = new ConcurrentHashMap<>();
+
+    /**
+     * Accepts dependencies in constructor.
+     * @param rpcClient
+     * @param defaultTimeout
+     * @param initialCfgNode Initial configuration nodes.
+     */
+    public RaftGroupRpcClientImpl(RpcClient rpcClient, int defaultTimeout, Set<Node> initialCfgNodes) {
+        this.defaultTimeout = defaultTimeout;
+        this.rpcClient = rpcClient;
+        this.initialCfgNodes = new HashSet<>(initialCfgNodes);
+        executor = Executors.newWorkStealingPool();
+    }
+
+    @Override public State state(String groupId, boolean refresh) {
+        return states.get(groupId);
+    }
+
+    @Override public Future<RaftClientCommonMessages.AddPeerResponse> addPeer(RaftClientCommonMessages.AddPeerRequest request) {
+        return null;
+    }
+
+    @Override public Future<RaftClientCommonMessages.RemovePeerResponse> removePeer(RaftClientCommonMessages.RemovePeerRequest request) {
+        return null;
+    }
+
+    @Override public Future<RaftClientCommonMessages.StatusResponse> resetPeers(PeerId peerId, RaftClientCommonMessages.ResetPeerRequest request) {
+        return null;
+    }
+
+    @Override public Future<RaftClientCommonMessages.StatusResponse> snapshot(PeerId peerId, RaftClientCommonMessages.SnapshotRequest request) {
+        return null;
+    }
+
+    @Override public Future<RaftClientCommonMessages.ChangePeersResponse> changePeers(RaftClientCommonMessages.ChangePeersRequest request) {
+        return null;
+    }
+
+    @Override public Future<RaftClientCommonMessages.LearnersOpResponse> addLearners(RaftClientCommonMessages.AddLearnersRequest request) {
+        return null;
+    }
+
+    @Override public Future<RaftClientCommonMessages.LearnersOpResponse> removeLearners(RaftClientCommonMessages.RemoveLearnersRequest request) {
+        return null;
+    }
+
+    @Override public Future<RaftClientCommonMessages.LearnersOpResponse> resetLearners(RaftClientCommonMessages.ResetLearnersRequest request) {
+        return null;
+    }
+
+    @Override public Future<RaftClientCommonMessages.StatusResponse> transferLeader(RaftClientCommonMessages.TransferLeaderRequest request) {
+        return null;
+    }
+
+    private CompletableFuture<RaftClientCommonMessages.GetLeaderResponse> refreshLeader(Node node, String groupId) {
+        RaftClientCommonMessages.GetLeaderRequest req = RaftClientCommonMessageBuilderFactory.DEFAULT.createGetLeaderRequest().setGroupId(groupId).build();
+
+        CompletableFuture<RaftClientCommonMessages.GetLeaderResponse> fut = new CompletableFuture<>();
+
+        rpcClient.invokeAsync(node, req, new InvokeCallback<RaftClientCommonMessages.GetLeaderResponse>() {
+            @Override public void complete(RaftClientCommonMessages.GetLeaderResponse response, Throwable err) {
+                if (err != null)
+                    fut.completeExceptionally(err);
+                else
+                    fut.complete(response);
+            }
+        }, executor, defaultTimeout);
+
+        return fut;
+    }
+
+    @Override public <R extends Message> Future<R> sendCustom(RaftGroupMessage request) {
+        State newState = new State();
+
+        State state = states.putIfAbsent(request.getGroupId(), newState);
+
+        if (state == null)
+            state = newState;
+
+        CompletableFuture<R> fut = new CompletableFuture<>();
+
+        fut.orTimeout(defaultTimeout, TimeUnit.MILLISECONDS);
+
+        if (state.getLeader() == null) {
+            synchronized (state) {
+                CompletableFuture<RaftClientCommonMessages.GetLeaderResponse> fut0 =
+                    refreshLeader(initialCfgNodes.iterator().next(), request.getGroupId()); // TODO asch search all nodes.
+
+                try {
+                    RaftClientCommonMessages.GetLeaderResponse resp = fut0.get(defaultTimeout, TimeUnit.MILLISECONDS);
+
+                    state.setLeader(resp.getLeaderId());
+                }
+                catch (Exception e) {
+                    fut.completeExceptionally(e);
+
+                    return fut;
+                }
+            }
+        }
+
+        rpcClient.invokeAsync(state.getLeader().getNode(), request, new InvokeCallback<R>() {
+            @Override public void complete(R response, Throwable err) {
+                if (err != null)
+                    fut.completeExceptionally(err);
+                else
+                    fut.complete(response);
+            }
+        }, executor, defaultTimeout);
+
+        return fut;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/CreateGetLeaderResponseImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/CreateGetLeaderResponseImpl.java
deleted file mode 100644
index 5b58d48..0000000
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/CreateGetLeaderResponseImpl.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.ignite.raft.client.message;
-
-
-import org.apache.ignite.raft.client.RaftClientCommonMessages;
-
-public class CreateGetLeaderResponseImpl implements RaftClientCommonMessages.GetLeaderResponse, RaftClientCommonMessages.GetLeaderResponse.Builder {
-    private String leaderId;
-
-    @Override public String getLeaderId() {
-        return leaderId;
-    }
-
-    @Override public RaftClientCommonMessages.GetLeaderResponse build() {
-        return this;
-    }
-
-    @Override public Builder setLeaderId(String leaderId) {
-        this.leaderId = leaderId;
-
-        return this;
-    }
-}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/CreateGetLeaderRequestImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java
similarity index 70%
rename from modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/CreateGetLeaderRequestImpl.java
rename to modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java
index 9c44e11..429bab4 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/CreateGetLeaderRequestImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderRequestImpl.java
@@ -1,10 +1,9 @@
 package org.apache.ignite.raft.client.message;
 
 
-import org.apache.ignite.raft.PeerId;
 import org.apache.ignite.raft.client.RaftClientCommonMessages;
 
-public class CreateGetLeaderRequestImpl implements RaftClientCommonMessages.GetLeaderRequest, RaftClientCommonMessages.GetLeaderRequest.Builder {
+public class GetLeaderRequestImpl implements RaftClientCommonMessages.GetLeaderRequest, RaftClientCommonMessages.GetLeaderRequest.Builder {
     private String groupId;
 
     @Override public String getGroupId() {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderResponseImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderResponseImpl.java
new file mode 100644
index 0000000..22789b1
--- /dev/null
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/GetLeaderResponseImpl.java
@@ -0,0 +1,23 @@
+package org.apache.ignite.raft.client.message;
+
+
+import org.apache.ignite.raft.PeerId;
+import org.apache.ignite.raft.client.RaftClientCommonMessages;
+
+public class GetLeaderResponseImpl implements RaftClientCommonMessages.GetLeaderResponse, RaftClientCommonMessages.GetLeaderResponse.Builder {
+    private PeerId leaderId;
+
+    @Override public PeerId getLeaderId() {
+        return leaderId;
+    }
+
+    @Override public RaftClientCommonMessages.GetLeaderResponse build() {
+        return this;
+    }
+
+    @Override public Builder setLeaderId(PeerId leaderId) {
+        this.leaderId = leaderId;
+
+        return this;
+    }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientCommonMessageBuilderFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientCommonMessageBuilderFactory.java
index f54ec48..45f070a 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientCommonMessageBuilderFactory.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/RaftClientCommonMessageBuilderFactory.java
@@ -53,11 +53,11 @@ public class RaftClientCommonMessageBuilderFactory implements ClientMessageBuild
     }
 
     @Override public RaftClientCommonMessages.GetLeaderRequest.Builder createGetLeaderRequest() {
-        return new CreateGetLeaderRequestImpl();
+        return new GetLeaderRequestImpl();
     }
 
     @Override public RaftClientCommonMessages.GetLeaderResponse.Builder createGetLeaderResponse() {
-        return new CreateGetLeaderResponseImpl();
+        return new GetLeaderResponseImpl();
     }
 
     @Override public RaftClientCommonMessages.GetPeersRequest.Builder createGetPeersRequest() {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java
index c58b5e2..743d519 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/InvokeCallback.java
@@ -19,6 +19,6 @@ package org.apache.ignite.raft.rpc;
 /**
  *
  */
-public interface InvokeCallback {
-    void complete(final Message response, final Throwable err);
+public interface InvokeCallback<T extends Message> {
+    void complete(final T response, final Throwable err);
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java
index bfff562..966ff52 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/Node.java
@@ -2,6 +2,9 @@ package org.apache.ignite.raft.rpc;
 
 import java.io.Serializable;
 
+/**
+ * TODO FIXME asch must be elsewhere.
+ */
 public interface Node extends Serializable {
     String id();
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
index 63556ac..f71424f 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/NodeImpl.java
@@ -18,6 +18,7 @@ package org.apache.ignite.raft.rpc;
 
 /**
  * An node with an immutable id.
+ * TODO FIXME asch must be elsewhere.
  */
 public class NodeImpl implements Node {
     private final String id;
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
index d5bd8e1..bf1656a 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
@@ -1,21 +1,97 @@
 package org.apache.ignite.raft.client;
 
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import org.apache.ignite.raft.PeerId;
+import org.apache.ignite.raft.State;
+import org.apache.ignite.raft.client.impl.RaftGroupRpcClientImpl;
+import org.apache.ignite.raft.client.message.GetLeaderResponseImpl;
+import org.apache.ignite.raft.client.message.RaftClientCommonMessageBuilderFactory;
+import org.apache.ignite.raft.rpc.InvokeCallback;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.NodeImpl;
+import org.apache.ignite.raft.rpc.RaftGroupMessage;
+import org.apache.ignite.raft.rpc.RpcClient;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.stubbing.Answer;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+
+@ExtendWith(MockitoExtension.class)
 public class RaftGroupRpcClientTest {
-    @BeforeEach
-    public void beforeTest() {
+    @Mock
+    private RpcClient rpcClient;
+
+    private static PeerId leader = new PeerId(new NodeImpl("test"));
+
+    @Test
+    public void testCustomMessage() throws Exception {
+        String groupId = "test";
+
+        mockClient();
+
+        // TODO FIXME asch where to get initial configuration for the group ?
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, 5_000, Collections.singleton(leader.getNode()));
+
+        JunkRequest req = new JunkRequest(groupId);
+
+        Future<Message> fut = client.sendCustom(req);
+
+        State state = client.state(groupId, false);
 
+        // Expecting raft group state to be transparently loaded on first request.
+        assertEquals(leader, state.getLeader());
+
+        assertTrue(fut.get() instanceof JunkResponse);
     }
 
-    @AfterEach
-    public void afterTest() {
+    private static class JunkRequest implements RaftGroupMessage {
+        private final String groupId;
+
+        JunkRequest(String groupId) {
+            this.groupId = groupId;
+        }
+
+        @Override public String getGroupId() {
+            return groupId;
+        }
     }
 
-    @Test
-    public void test() {
+    private static class JunkResponse implements Message {}
+
+    private void mockClient() {
+        // Mock junk request.
+        Mockito.doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+                InvokeCallback callback = invocation.getArgument(2);
+                Executor executor = invocation.getArgument(3);
+
+                executor.execute(() -> callback.complete(new JunkResponse(), null));
+
+                return null;
+            }
+        }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(JunkRequest.class), any(), any(), anyLong());
+
+        // Mock get leader request.
+        Mockito.doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+                InvokeCallback callback = invocation.getArgument(2);
+                Executor executor = invocation.getArgument(3);
+
+                executor.execute(() -> callback.complete(RaftClientCommonMessageBuilderFactory.DEFAULT.createGetLeaderResponse().setLeaderId(leader).build(), null));
 
+                return null;
+            }
+        }).when(rpcClient).invokeAsync(eq(leader.getNode()), any(RaftClientCommonMessages.GetLeaderRequest.class), any(), any(), anyLong());
     }
 }