You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/27 11:23:48 UTC

[ignite-3] 02/04: IGNITE-14149 wip.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch ignite-14149
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit df9226b76c769bbeadaf91dce90cf695d83d5811
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Sat Feb 27 12:57:02 2021 +0300

    IGNITE-14149 wip.
---
 .../raft/client/message/ClientMessageBuilderFactory.java   |  4 +++-
 .../apache/ignite/raft/client/rpc/RaftGroupRpcClient.java  | 10 ++++++++++
 .../raft/client/rpc/impl/RaftGroupRpcClientImpl.java       | 11 +++++++++--
 .../service/impl/RaftGroupClientRequestServiceImpl.java    | 13 ++++++++++++-
 .../apache/ignite/raft/client/RaftGroupRpcClientTest.java  | 14 ++++++--------
 5 files changed, 40 insertions(+), 12 deletions(-)

diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ClientMessageBuilderFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ClientMessageBuilderFactory.java
index 952fbfd..b0f48cf 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ClientMessageBuilderFactory.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ClientMessageBuilderFactory.java
@@ -4,7 +4,7 @@ import org.apache.ignite.raft.client.RaftClientCommonMessages;
 
 /** */
 public interface ClientMessageBuilderFactory {
-    public static ClientMessageBuilderFactory DEFAULT = new RaftClientCommonMessageBuilderFactory();
+    public static ClientMessageBuilderFactory DEFAULT_MESSAGE_BUILDER_FACTORY = new RaftClientCommonMessageBuilderFactory();
 
     RaftClientCommonMessages.PingRequest.Builder createPingRequest();
 
@@ -43,4 +43,6 @@ public interface ClientMessageBuilderFactory {
     RaftClientCommonMessages.ResetLearnersRequest.Builder createResetLearnersRequest();
 
     RaftClientCommonMessages.LearnersOpResponse.Builder createLearnersOpResponse();
+
+    RaftClientCommonMessages.UserRequest.Builder createUserRequest();
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
index 5f657eb..2eca6e1 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
@@ -19,6 +19,7 @@ package org.apache.ignite.raft.client.rpc;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.raft.State;
 import org.apache.ignite.raft.PeerId;
+import org.apache.ignite.raft.client.message.ClientMessageBuilderFactory;
 import org.apache.ignite.raft.rpc.Message;
 import org.apache.ignite.raft.rpc.RaftGroupMessage;
 import org.jetbrains.annotations.Nullable;
@@ -58,6 +59,13 @@ public interface RaftGroupRpcClient {
     CompletableFuture<PeerId> refreshLeader(String groupId);
 
     /**
+     * Refreshes group members (but without a leader).
+     * @param groupId Group id.
+     * @return A future.
+     */
+    CompletableFuture<State> refreshMembers(String groupId);
+
+    /**
      * Adds a voring peer to the raft group.
      *
      * @param request   request data
@@ -152,4 +160,6 @@ public interface RaftGroupRpcClient {
      * @return a future with result
      */
     <R extends Message> CompletableFuture<R> sendCustom(RaftGroupMessage request);
+
+    ClientMessageBuilderFactory factory();
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
index f0ceda6..e9c67cb 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
@@ -27,7 +27,6 @@ import org.apache.ignite.raft.rpc.RpcClient;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static org.apache.ignite.raft.client.message.RaftClientCommonMessageBuilderFactory.DEFAULT;
 
 public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
     private final ExecutorService executor;
@@ -84,6 +83,10 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
             });
     }
 
+    @Override public CompletableFuture<State> refreshMembers(String groupId) {
+        return null;
+    }
+
     @Override public CompletableFuture<RaftClientCommonMessages.AddPeerResponse> addPeer(RaftClientCommonMessages.AddPeerRequest request) {
         return null;
     }
@@ -130,7 +133,7 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
                 return fut;
 
             if (state.updateFutRef.compareAndSet(null, (fut = new CompletableFuture<>()))) {
-                RaftClientCommonMessages.GetLeaderRequest req = DEFAULT.createGetLeaderRequest().setGroupId(groupId).build();
+                RaftClientCommonMessages.GetLeaderRequest req = factory.createGetLeaderRequest().setGroupId(groupId).build();
 
                 CompletableFuture<GetLeaderResponse> finalFut = fut;
 
@@ -191,6 +194,10 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
         return fut;
     }
 
+    @Override public ClientMessageBuilderFactory factory() {
+        return this.factory;
+    }
+
     private static class StateImpl implements State {
         private volatile PeerId leader;
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java
index 303a90e..b5439c2 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java
@@ -1,8 +1,11 @@
 package org.apache.ignite.raft.client.service.impl;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.raft.client.RaftClientCommonMessages;
 import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
 import org.apache.ignite.raft.client.service.RaftGroupClientRequestService;
+import org.apache.ignite.raft.rpc.Message;
 
 public class RaftGroupClientRequestServiceImpl implements RaftGroupClientRequestService {
     private RaftGroupRpcClient rpcClient;
@@ -12,6 +15,14 @@ public class RaftGroupClientRequestServiceImpl implements RaftGroupClientRequest
     }
 
     @Override public <T, R> CompletableFuture<R> submit(T request) {
-        return null;
+        RaftClientCommonMessages.UserRequest r = rpcClient.factory().createUserRequest().setRequest(request).build();
+
+        return rpcClient.sendCustom(r).thenApply(new Function<Message, R>() {
+            @Override public R apply(Message message) {
+                RaftClientCommonMessages.UserResponse<R> resp = (RaftClientCommonMessages.UserResponse<R>) message;
+
+                return resp.response();
+            }
+        });
     }
 }
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
index d755e86..c476422 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
@@ -5,12 +5,10 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReferenceArray;
 import org.apache.ignite.raft.PeerId;
 import org.apache.ignite.raft.State;
 import org.apache.ignite.raft.client.RaftClientCommonMessages.GetLeaderRequest;
 import org.apache.ignite.raft.client.rpc.impl.RaftGroupRpcClientImpl;
-import org.apache.ignite.raft.client.message.RaftClientCommonMessageBuilderFactory;
 import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
 import org.apache.ignite.raft.rpc.InvokeCallback;
 import org.apache.ignite.raft.rpc.Message;
@@ -19,13 +17,13 @@ import org.apache.ignite.raft.rpc.RaftGroupMessage;
 import org.apache.ignite.raft.rpc.RpcClient;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.parallel.Execution;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.stubbing.Answer;
 
+import static org.apache.ignite.raft.client.message.ClientMessageBuilderFactory.DEFAULT_MESSAGE_BUILDER_FACTORY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -46,7 +44,7 @@ public class RaftGroupRpcClientTest {
 
         mockLeaderRequest(false);
 
-        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, new RaftClientCommonMessageBuilderFactory(),
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, DEFAULT_MESSAGE_BUILDER_FACTORY,
             5_000, Collections.singleton(leader.getNode()));
 
         PeerId leaderId = client.refreshLeader(groupId).get();
@@ -61,7 +59,7 @@ public class RaftGroupRpcClientTest {
 
         mockLeaderRequest(false);
 
-        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, new RaftClientCommonMessageBuilderFactory(),
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, DEFAULT_MESSAGE_BUILDER_FACTORY,
             5_000, Collections.singleton(leader.getNode()));
 
         int cnt = 20;
@@ -106,7 +104,7 @@ public class RaftGroupRpcClientTest {
 
         mockLeaderRequest(true);
 
-        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, new RaftClientCommonMessageBuilderFactory(),
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, DEFAULT_MESSAGE_BUILDER_FACTORY,
             5_000, Collections.singleton(leader.getNode()));
 
         try {
@@ -126,7 +124,7 @@ public class RaftGroupRpcClientTest {
         mockLeaderRequest(false);
         mockCustomRequest();
 
-        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, new RaftClientCommonMessageBuilderFactory(),
+        RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, DEFAULT_MESSAGE_BUILDER_FACTORY,
             5_000, Collections.singleton(leader.getNode()));
 
         JunkRequest req = new JunkRequest(groupId);
@@ -178,7 +176,7 @@ public class RaftGroupRpcClientTest {
                     if (timeout)
                         callback.complete(null, new TimeoutException());
                     else
-                        callback.complete(RaftClientCommonMessageBuilderFactory.DEFAULT.createGetLeaderResponse().setLeaderId(leader).build(), null);
+                        callback.complete(DEFAULT_MESSAGE_BUILDER_FACTORY.createGetLeaderResponse().setLeaderId(leader).build(), null);
                 });
 
                 return null;