You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/27 11:23:48 UTC
[ignite-3] 02/04: IGNITE-14149 wip.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch ignite-14149
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit df9226b76c769bbeadaf91dce90cf695d83d5811
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Sat Feb 27 12:57:02 2021 +0300
IGNITE-14149 wip.
---
.../raft/client/message/ClientMessageBuilderFactory.java | 4 +++-
.../apache/ignite/raft/client/rpc/RaftGroupRpcClient.java | 10 ++++++++++
.../raft/client/rpc/impl/RaftGroupRpcClientImpl.java | 11 +++++++++--
.../service/impl/RaftGroupClientRequestServiceImpl.java | 13 ++++++++++++-
.../apache/ignite/raft/client/RaftGroupRpcClientTest.java | 14 ++++++--------
5 files changed, 40 insertions(+), 12 deletions(-)
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ClientMessageBuilderFactory.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ClientMessageBuilderFactory.java
index 952fbfd..b0f48cf 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ClientMessageBuilderFactory.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ClientMessageBuilderFactory.java
@@ -4,7 +4,7 @@ import org.apache.ignite.raft.client.RaftClientCommonMessages;
/** */
public interface ClientMessageBuilderFactory {
- public static ClientMessageBuilderFactory DEFAULT = new RaftClientCommonMessageBuilderFactory();
+ public static ClientMessageBuilderFactory DEFAULT_MESSAGE_BUILDER_FACTORY = new RaftClientCommonMessageBuilderFactory();
RaftClientCommonMessages.PingRequest.Builder createPingRequest();
@@ -43,4 +43,6 @@ public interface ClientMessageBuilderFactory {
RaftClientCommonMessages.ResetLearnersRequest.Builder createResetLearnersRequest();
RaftClientCommonMessages.LearnersOpResponse.Builder createLearnersOpResponse();
+
+ RaftClientCommonMessages.UserRequest.Builder createUserRequest();
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
index 5f657eb..2eca6e1 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/RaftGroupRpcClient.java
@@ -19,6 +19,7 @@ package org.apache.ignite.raft.client.rpc;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.raft.State;
import org.apache.ignite.raft.PeerId;
+import org.apache.ignite.raft.client.message.ClientMessageBuilderFactory;
import org.apache.ignite.raft.rpc.Message;
import org.apache.ignite.raft.rpc.RaftGroupMessage;
import org.jetbrains.annotations.Nullable;
@@ -58,6 +59,13 @@ public interface RaftGroupRpcClient {
CompletableFuture<PeerId> refreshLeader(String groupId);
/**
+ * Refreshes group members (but without a leader).
+ * @param groupId Group id.
+ * @return A future.
+ */
+ CompletableFuture<State> refreshMembers(String groupId);
+
+ /**
* Adds a voring peer to the raft group.
*
* @param request request data
@@ -152,4 +160,6 @@ public interface RaftGroupRpcClient {
* @return a future with result
*/
<R extends Message> CompletableFuture<R> sendCustom(RaftGroupMessage request);
+
+ ClientMessageBuilderFactory factory();
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
index f0ceda6..e9c67cb 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/rpc/impl/RaftGroupRpcClientImpl.java
@@ -27,7 +27,6 @@ import org.apache.ignite.raft.rpc.RpcClient;
import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import static org.apache.ignite.raft.client.message.RaftClientCommonMessageBuilderFactory.DEFAULT;
public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
private final ExecutorService executor;
@@ -84,6 +83,10 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
});
}
+ @Override public CompletableFuture<State> refreshMembers(String groupId) {
+ return null;
+ }
+
@Override public CompletableFuture<RaftClientCommonMessages.AddPeerResponse> addPeer(RaftClientCommonMessages.AddPeerRequest request) {
return null;
}
@@ -130,7 +133,7 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
return fut;
if (state.updateFutRef.compareAndSet(null, (fut = new CompletableFuture<>()))) {
- RaftClientCommonMessages.GetLeaderRequest req = DEFAULT.createGetLeaderRequest().setGroupId(groupId).build();
+ RaftClientCommonMessages.GetLeaderRequest req = factory.createGetLeaderRequest().setGroupId(groupId).build();
CompletableFuture<GetLeaderResponse> finalFut = fut;
@@ -191,6 +194,10 @@ public class RaftGroupRpcClientImpl implements RaftGroupRpcClient {
return fut;
}
+ @Override public ClientMessageBuilderFactory factory() {
+ return this.factory;
+ }
+
private static class StateImpl implements State {
private volatile PeerId leader;
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java
index 303a90e..b5439c2 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupClientRequestServiceImpl.java
@@ -1,8 +1,11 @@
package org.apache.ignite.raft.client.service.impl;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.raft.client.RaftClientCommonMessages;
import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
import org.apache.ignite.raft.client.service.RaftGroupClientRequestService;
+import org.apache.ignite.raft.rpc.Message;
public class RaftGroupClientRequestServiceImpl implements RaftGroupClientRequestService {
private RaftGroupRpcClient rpcClient;
@@ -12,6 +15,14 @@ public class RaftGroupClientRequestServiceImpl implements RaftGroupClientRequest
}
@Override public <T, R> CompletableFuture<R> submit(T request) {
- return null;
+ RaftClientCommonMessages.UserRequest r = rpcClient.factory().createUserRequest().setRequest(request).build();
+
+ return rpcClient.sendCustom(r).thenApply(new Function<Message, R>() {
+ @Override public R apply(Message message) {
+ RaftClientCommonMessages.UserResponse<R> resp = (RaftClientCommonMessages.UserResponse<R>) message;
+
+ return resp.response();
+ }
+ });
}
}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
index d755e86..c476422 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/RaftGroupRpcClientTest.java
@@ -5,12 +5,10 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.raft.PeerId;
import org.apache.ignite.raft.State;
import org.apache.ignite.raft.client.RaftClientCommonMessages.GetLeaderRequest;
import org.apache.ignite.raft.client.rpc.impl.RaftGroupRpcClientImpl;
-import org.apache.ignite.raft.client.message.RaftClientCommonMessageBuilderFactory;
import org.apache.ignite.raft.client.rpc.RaftGroupRpcClient;
import org.apache.ignite.raft.rpc.InvokeCallback;
import org.apache.ignite.raft.rpc.Message;
@@ -19,13 +17,13 @@ import org.apache.ignite.raft.rpc.RaftGroupMessage;
import org.apache.ignite.raft.rpc.RpcClient;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.parallel.Execution;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;
+import static org.apache.ignite.raft.client.message.ClientMessageBuilderFactory.DEFAULT_MESSAGE_BUILDER_FACTORY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -46,7 +44,7 @@ public class RaftGroupRpcClientTest {
mockLeaderRequest(false);
- RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, new RaftClientCommonMessageBuilderFactory(),
+ RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, DEFAULT_MESSAGE_BUILDER_FACTORY,
5_000, Collections.singleton(leader.getNode()));
PeerId leaderId = client.refreshLeader(groupId).get();
@@ -61,7 +59,7 @@ public class RaftGroupRpcClientTest {
mockLeaderRequest(false);
- RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, new RaftClientCommonMessageBuilderFactory(),
+ RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, DEFAULT_MESSAGE_BUILDER_FACTORY,
5_000, Collections.singleton(leader.getNode()));
int cnt = 20;
@@ -106,7 +104,7 @@ public class RaftGroupRpcClientTest {
mockLeaderRequest(true);
- RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, new RaftClientCommonMessageBuilderFactory(),
+ RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, DEFAULT_MESSAGE_BUILDER_FACTORY,
5_000, Collections.singleton(leader.getNode()));
try {
@@ -126,7 +124,7 @@ public class RaftGroupRpcClientTest {
mockLeaderRequest(false);
mockCustomRequest();
- RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, new RaftClientCommonMessageBuilderFactory(),
+ RaftGroupRpcClient client = new RaftGroupRpcClientImpl(rpcClient, DEFAULT_MESSAGE_BUILDER_FACTORY,
5_000, Collections.singleton(leader.getNode()));
JunkRequest req = new JunkRequest(groupId);
@@ -178,7 +176,7 @@ public class RaftGroupRpcClientTest {
if (timeout)
callback.complete(null, new TimeoutException());
else
- callback.complete(RaftClientCommonMessageBuilderFactory.DEFAULT.createGetLeaderResponse().setLeaderId(leader).build(), null);
+ callback.complete(DEFAULT_MESSAGE_BUILDER_FACTORY.createGetLeaderResponse().setLeaderId(leader).build(), null);
});
return null;