You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/10 16:53:56 UTC
[ignite-3] branch ignite-13885 updated: IGNITE-13885 Raft client
wip 4.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch ignite-13885
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-13885 by this push:
new 60e7551 IGNITE-13885 Raft client wip 4.
60e7551 is described below
commit 60e7551dde051fb929137a1c6305347a55066eb0
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Wed Feb 10 19:53:41 2021 +0300
IGNITE-13885 Raft client wip 4.
---
.../raft/rpc/ConnectionOpenedEventListener.java} | 18 ++--------
.../ignite/raft/service/AbstractClientService.java | 14 +++-----
...entService.java => RaftGroupClientService.java} | 18 ++++++++--
...ceImpl.java => RaftGroupClientServiceImpl.java} | 6 +++-
.../org/apache/ignite/raft/service/RouteTable.java | 16 ++++-----
...ntTest.java => RaftGroupClientServiceTest.java} | 41 ++++++++++++++++------
.../org/apache/ignite/raft/rpc/CustomRequest.java | 4 +++
.../org/apache/ignite/raft/rpc/CustomResponse.java | 4 +++
...cessor.java => TestCustomRequestProcessor.java} | 17 ++++-----
.../raft/rpc/TestGetLeaderRequestProcessor.java | 3 +-
10 files changed, 83 insertions(+), 58 deletions(-)
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/ConnectionOpenedEventListener.java
similarity index 57%
copy from modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
copy to modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/ConnectionOpenedEventListener.java
index c9e76c2..3011f99 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/ConnectionOpenedEventListener.java
@@ -16,21 +16,9 @@
*/
package org.apache.ignite.raft.rpc;
-
-import org.apache.ignite.raft.rpc.CliRequests.GetLeaderRequest;
-import org.apache.ignite.raft.rpc.RpcRequests.PingRequest;
-
/**
- * Ping request processor.
+ *
*/
-public class TestGetLeaderRequestProcessor implements RpcProcessor<GetLeaderRequest> {
- @Override public void handleRequest(RpcContext rpcCtx, GetLeaderRequest request) {
- CliRequests.GetLeaderResponse.Builder resp = MessageBuilderFactory.DEFAULT.createGetLeaderResponse();
- resp.setLeaderId("127.0.0.1:8081");
- rpcCtx.sendResponse(resp.build());
- }
-
- @Override public String interest() {
- return GetLeaderRequest.class.getName();
- }
+public interface ConnectionOpenedEventListener {
+ void onOpened(final String remoteAddress, final Connection conn);
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java
index da09246..562ccf0 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java
@@ -44,10 +44,6 @@ public abstract class AbstractClientService implements ClientService {
protected volatile RpcClient rpcClient;
protected RpcOptions rpcOptions;
- public RpcClient getRpcClient() {
- return this.rpcClient;
- }
-
@Override
public boolean isConnected(final Endpoint endpoint) {
final RpcClient rc = this.rpcClient;
@@ -160,12 +156,12 @@ public abstract class AbstractClientService implements ClientService {
if (err == null) {
Status status = Status.OK();
- Message msg;
+ T msg;
if (result instanceof ErrorResponse) {
status = handleErrorResponse((ErrorResponse) result);
- msg = (Message) result;
+ msg = (T) result;
} else {
- msg = (Message) result;
+ msg = (T) result;
}
if (done != null) {
try {
@@ -203,12 +199,12 @@ public abstract class AbstractClientService implements ClientService {
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
future.completeExceptionally(e);
- // should be in another thread to avoid dead locking.
+ // Should be in another thread to avoid deadlocking.
RpcUtils.runClosureInExecutor(currExecutor, done,
new Status(RaftError.EINTR, "Sending rpc was interrupted"));
} catch (final RemotingException e) {
future.completeExceptionally(e);
- // should be in another thread to avoid dead locking.
+ // Should be in another thread to avoid deadlocking.
RpcUtils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTERNAL,
"Fail to send a RPC request:" + e.getMessage()));
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientService.java
similarity index 90%
rename from modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java
rename to modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientService.java
index cd0d6ca..1ca44ab 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientService.java
@@ -22,11 +22,12 @@ import org.apache.ignite.raft.closure.RpcResponseClosure;
import org.apache.ignite.raft.rpc.CliRequests;
import org.apache.ignite.raft.rpc.Message;
import org.apache.ignite.raft.rpc.RpcRequests;
+import org.apache.ignite.raft.rpc.RpcRequests.PingRequest;
/**
- * Cli RPC client service.
+ * Raft group RPC client service.
*/
-public interface CliClientService extends ClientService {
+public interface RaftGroupClientService extends ClientService {
/**
* Ping a node.
*
@@ -35,7 +36,7 @@ public interface CliClientService extends ClientService {
* @param done callback
* @return a future with result
*/
- Future<Message> ping(Endpoint endpoint, RpcRequests.PingRequest request,
+ Future<Message> ping(Endpoint endpoint, PingRequest request,
RpcResponseClosure<RpcRequests.ErrorResponse> done);
/**
@@ -161,4 +162,15 @@ public interface CliClientService extends ClientService {
*/
Future<Message> getPeers(Endpoint endpoint, CliRequests.GetPeersRequest request,
RpcResponseClosure<CliRequests.GetPeersResponse> done);
+
+ /**
+ * Send custom request.
+ *
+ * @param endpoint server address
+ * @param request request data
+ * @param done callback
+ * @return a future with result
+ */
+ Future<Message> sendCustom(Endpoint endpoint, Message request,
+ RpcResponseClosure<Message> done);
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientServiceImpl.java
similarity index 94%
rename from modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java
rename to modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientServiceImpl.java
index f6211ff..cb59429 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientServiceImpl.java
@@ -43,7 +43,7 @@ import org.apache.ignite.raft.rpc.RpcRequests.ErrorResponse;
/**
*
*/
-public class CliClientServiceImpl extends AbstractClientService implements CliClientService {
+public class RaftGroupClientServiceImpl extends AbstractClientService implements RaftGroupClientService {
@Override public Future<Message> ping(Endpoint endpoint, RpcRequests.PingRequest request,
final RpcResponseClosure<ErrorResponse> done) {
return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
@@ -114,4 +114,8 @@ public class CliClientServiceImpl extends AbstractClientService implements CliCl
final RpcResponseClosure<GetPeersResponse> done) {
return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
}
+
+ @Override public Future<Message> sendCustom(Endpoint endpoint, Message request, RpcResponseClosure<Message> done) {
+ return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+ }
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java
index 916a768..65e6f2b 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java
@@ -82,7 +82,7 @@ public class RouteTable {
/**
* Get the cached leader of the group, return it when found, null otherwise.
- * Make sure calls {@link #refreshLeader(CliClientService, String, int)} already
+ * Make sure calls {@link #refreshLeader(RaftGroupClientService, String, int)} already
* before invoke this method.
*
* @param groupId raft group id
@@ -174,7 +174,7 @@ public class RouteTable {
* @param timeoutMs timeout millis
* @return operation status
*/
- public Status refreshLeader(final CliClientService cliClientService, final String groupId, final int timeoutMs)
+ public Status refreshLeader(final RaftGroupClientService raftGroupClientService, final String groupId, final int timeoutMs)
throws InterruptedException,
TimeoutException {
final Configuration conf = getConfiguration(groupId);
@@ -188,7 +188,7 @@ public class RouteTable {
final CliRequests.GetLeaderRequest request = rb.build();
TimeoutException timeoutException = null;
for (final PeerId peer : conf) {
- if (!cliClientService.connect(peer.getEndpoint())) {
+ if (!raftGroupClientService.connect(peer.getEndpoint())) {
if (st.isOk()) {
st.setError(-1, "Fail to init channel to %s", peer);
} else {
@@ -197,7 +197,7 @@ public class RouteTable {
}
continue;
}
- final Future<Message> result = cliClientService.getLeader(peer.getEndpoint(), request, null);
+ final Future<Message> result = raftGroupClientService.getLeader(peer.getEndpoint(), request, null);
try {
final Message msg = result.get(timeoutMs, TimeUnit.MILLISECONDS);
if (msg instanceof RpcRequests.ErrorResponse) {
@@ -230,7 +230,7 @@ public class RouteTable {
return st;
}
- public Status refreshConfiguration(final CliClientService cliClientService, final String groupId,
+ public Status refreshConfiguration(final RaftGroupClientService raftGroupClientService, final String groupId,
final int timeoutMs) throws InterruptedException, TimeoutException {
final Configuration conf = getConfiguration(groupId);
if (conf == null) {
@@ -240,14 +240,14 @@ public class RouteTable {
final Status st = Status.OK();
PeerId leaderId = selectLeader(groupId);
if (leaderId == null) {
- refreshLeader(cliClientService, groupId, timeoutMs);
+ refreshLeader(raftGroupClientService, groupId, timeoutMs);
leaderId = selectLeader(groupId);
}
if (leaderId == null) {
st.setError(-1, "Fail to get leader of group %s", groupId);
return st;
}
- if (!cliClientService.connect(leaderId.getEndpoint())) {
+ if (!raftGroupClientService.connect(leaderId.getEndpoint())) {
st.setError(-1, "Fail to init channel to %s", leaderId);
return st;
}
@@ -255,7 +255,7 @@ public class RouteTable {
rb.setGroupId(groupId);
rb.setLeaderId(leaderId.toString());
try {
- final Message result = cliClientService.getPeers(leaderId.getEndpoint(), rb.build(), null).get(timeoutMs,
+ final Message result = raftGroupClientService.getPeers(leaderId.getEndpoint(), rb.build(), null).get(timeoutMs,
TimeUnit.MILLISECONDS);
if (result instanceof CliRequests.GetPeersResponse) {
final CliRequests.GetPeersResponse resp = (CliRequests.GetPeersResponse) result;
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftGroupClientServiceTest.java
similarity index 68%
rename from modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java
rename to modules/raft-client/src/test/java/org/apache/ignite/raft/RaftGroupClientServiceTest.java
index 54404a8..f3257fb 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftGroupClientServiceTest.java
@@ -7,23 +7,30 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.raft.closure.RpcResponseClosureAdapter;
+import org.apache.ignite.raft.rpc.CustomRequest;
+import org.apache.ignite.raft.rpc.CustomResponse;
import org.apache.ignite.raft.rpc.Message;
import org.apache.ignite.raft.rpc.MessageBuilderFactory;
import org.apache.ignite.raft.rpc.RpcOptions;
import org.apache.ignite.raft.rpc.RpcRequests;
import org.apache.ignite.raft.rpc.RpcRequests.ErrorResponse;
+import org.apache.ignite.raft.rpc.TestCustomRequestProcessor;
import org.apache.ignite.raft.rpc.TestGetLeaderRequestProcessor;
import org.apache.ignite.raft.rpc.TestPingRequestProcessor;
import org.apache.ignite.raft.rpc.impl.LocalRpcServer;
-import org.apache.ignite.raft.service.CliClientServiceImpl;
+import org.apache.ignite.raft.service.RaftGroupClientServiceImpl;
import org.apache.ignite.raft.service.RouteTable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-public class RaftClientTest {
- private static final System.Logger LOG = System.getLogger(RaftClientTest.class.getName());
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RaftGroupClientServiceTest {
+ private static final System.Logger LOG = System.getLogger(RaftGroupClientServiceTest.class.getName());
private static final Configuration initConf = new Configuration(Arrays.asList(
new PeerId("127.0.0.1", 8080),
@@ -39,6 +46,7 @@ public class RaftClientTest {
LocalRpcServer srv = new LocalRpcServer(peer.getEndpoint());
srv.registerProcessor(new TestPingRequestProcessor());
srv.registerProcessor(new TestGetLeaderRequestProcessor());
+ srv.registerProcessor(new TestCustomRequestProcessor());
srv.init(new RpcOptions());
srvs.add(srv);
}
@@ -54,29 +62,42 @@ public class RaftClientTest {
}
@Test
+ public void testCustom() throws TimeoutException, InterruptedException, ExecutionException {
+ final RaftGroupClientServiceImpl cliClientService = new RaftGroupClientServiceImpl();
+ cliClientService.init(new RpcOptions());
+
+ Future<Message> resp = cliClientService.sendCustom(initConf.getPeers().get(0).getEndpoint(), new CustomRequest(), null);
+
+ assertTrue(resp.get() instanceof CustomResponse);
+ }
+
+ @Test
public void testPing() throws TimeoutException, InterruptedException, ExecutionException {
String groupId = "unittest";
RouteTable.getInstance().updateConfiguration(groupId, initConf);
- final CliClientServiceImpl cliClientService = new CliClientServiceImpl();
+ final RaftGroupClientServiceImpl cliClientService = new RaftGroupClientServiceImpl();
cliClientService.init(new RpcOptions());
RpcRequests.PingRequest.Builder builder = MessageBuilderFactory.DEFAULT.createPingRequest();
builder.setSendTimestamp(System.currentTimeMillis());
RpcRequests.PingRequest req = builder.build();
+ AtomicReference<Status> ref = new AtomicReference<>();
+
RpcResponseClosureAdapter<ErrorResponse> done = new RpcResponseClosureAdapter<>() {
@Override public void run(Status status) {
- System.out.println();
+ ref.set(status);
}
};
Future<Message> resp = cliClientService.ping(initConf.getPeers().get(0).getEndpoint(), req, done);
- Message msg = resp.get();
+ ErrorResponse msg = (ErrorResponse) resp.get();
- System.out.println();
+ assertEquals(RaftError.SUCCESS.getNumber(), msg.getErrorCode());
+ assertTrue(ref.get().isOk());
}
@Test
@@ -93,10 +114,10 @@ public class RaftClientTest {
RouteTable.getInstance().updateConfiguration(groupId, initConf);
- final CliClientServiceImpl cliClientService = new CliClientServiceImpl();
- cliClientService.init(new RpcOptions());
+ final RaftGroupClientServiceImpl raftGroupClientService = new RaftGroupClientServiceImpl();
+ raftGroupClientService.init(new RpcOptions());
- if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) {
+ if (!RouteTable.getInstance().refreshLeader(raftGroupClientService, groupId, 1000).isOk()) {
throw new IllegalStateException("Refresh leader failed");
}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomRequest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomRequest.java
new file mode 100644
index 0000000..8f53d99
--- /dev/null
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomRequest.java
@@ -0,0 +1,4 @@
+package org.apache.ignite.raft.rpc;
+
+public class CustomRequest implements Message {
+}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomResponse.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomResponse.java
new file mode 100644
index 0000000..b795f2c
--- /dev/null
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomResponse.java
@@ -0,0 +1,4 @@
+package org.apache.ignite.raft.rpc;
+
+public class CustomResponse implements Message {
+}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestCustomRequestProcessor.java
similarity index 60%
copy from modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
copy to modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestCustomRequestProcessor.java
index c9e76c2..a5c64a0 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestCustomRequestProcessor.java
@@ -17,20 +17,17 @@
package org.apache.ignite.raft.rpc;
-import org.apache.ignite.raft.rpc.CliRequests.GetLeaderRequest;
-import org.apache.ignite.raft.rpc.RpcRequests.PingRequest;
-
/**
* Ping request processor.
*/
-public class TestGetLeaderRequestProcessor implements RpcProcessor<GetLeaderRequest> {
- @Override public void handleRequest(RpcContext rpcCtx, GetLeaderRequest request) {
- CliRequests.GetLeaderResponse.Builder resp = MessageBuilderFactory.DEFAULT.createGetLeaderResponse();
- resp.setLeaderId("127.0.0.1:8081");
- rpcCtx.sendResponse(resp.build());
+public class TestCustomRequestProcessor implements RpcProcessor<CustomRequest> {
+ @Override
+ public void handleRequest(final RpcContext rpcCtx, final CustomRequest request) {
+ rpcCtx.sendResponse(new CustomResponse());
}
- @Override public String interest() {
- return GetLeaderRequest.class.getName();
+ @Override
+ public String interest() {
+ return CustomRequest.class.getName();
}
}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
index c9e76c2..934a2ec 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
@@ -18,10 +18,9 @@ package org.apache.ignite.raft.rpc;
import org.apache.ignite.raft.rpc.CliRequests.GetLeaderRequest;
-import org.apache.ignite.raft.rpc.RpcRequests.PingRequest;
/**
- * Ping request processor.
+ * Get leader request processor.
*/
public class TestGetLeaderRequestProcessor implements RpcProcessor<GetLeaderRequest> {
@Override public void handleRequest(RpcContext rpcCtx, GetLeaderRequest request) {