You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/10 16:53:56 UTC

[ignite-3] branch ignite-13885 updated: IGNITE-13885 Raft client wip 4.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch ignite-13885
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-13885 by this push:
     new 60e7551  IGNITE-13885 Raft client wip 4.
60e7551 is described below

commit 60e7551dde051fb929137a1c6305347a55066eb0
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Wed Feb 10 19:53:41 2021 +0300

    IGNITE-13885 Raft client wip 4.
---
 .../raft/rpc/ConnectionOpenedEventListener.java}   | 18 ++--------
 .../ignite/raft/service/AbstractClientService.java | 14 +++-----
 ...entService.java => RaftGroupClientService.java} | 18 ++++++++--
 ...ceImpl.java => RaftGroupClientServiceImpl.java} |  6 +++-
 .../org/apache/ignite/raft/service/RouteTable.java | 16 ++++-----
 ...ntTest.java => RaftGroupClientServiceTest.java} | 41 ++++++++++++++++------
 .../org/apache/ignite/raft/rpc/CustomRequest.java  |  4 +++
 .../org/apache/ignite/raft/rpc/CustomResponse.java |  4 +++
 ...cessor.java => TestCustomRequestProcessor.java} | 17 ++++-----
 .../raft/rpc/TestGetLeaderRequestProcessor.java    |  3 +-
 10 files changed, 83 insertions(+), 58 deletions(-)

diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/ConnectionOpenedEventListener.java
similarity index 57%
copy from modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
copy to modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/ConnectionOpenedEventListener.java
index c9e76c2..3011f99 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/ConnectionOpenedEventListener.java
@@ -16,21 +16,9 @@
  */
 package org.apache.ignite.raft.rpc;
 
-
-import org.apache.ignite.raft.rpc.CliRequests.GetLeaderRequest;
-import org.apache.ignite.raft.rpc.RpcRequests.PingRequest;
-
 /**
- * Ping request processor.
+ *
  */
-public class TestGetLeaderRequestProcessor implements RpcProcessor<GetLeaderRequest> {
-    @Override public void handleRequest(RpcContext rpcCtx, GetLeaderRequest request) {
-        CliRequests.GetLeaderResponse.Builder resp = MessageBuilderFactory.DEFAULT.createGetLeaderResponse();
-        resp.setLeaderId("127.0.0.1:8081");
-        rpcCtx.sendResponse(resp.build());
-    }
-
-    @Override public String interest() {
-        return GetLeaderRequest.class.getName();
-    }
+public interface ConnectionOpenedEventListener {
+    void onOpened(final String remoteAddress, final Connection conn);
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java
index da09246..562ccf0 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java
@@ -44,10 +44,6 @@ public abstract class AbstractClientService implements ClientService {
     protected volatile RpcClient rpcClient;
     protected RpcOptions rpcOptions;
 
-    public RpcClient getRpcClient() {
-        return this.rpcClient;
-    }
-
     @Override
     public boolean isConnected(final Endpoint endpoint) {
         final RpcClient rc = this.rpcClient;
@@ -160,12 +156,12 @@ public abstract class AbstractClientService implements ClientService {
 
                     if (err == null) {
                         Status status = Status.OK();
-                        Message msg;
+                        T msg;
                         if (result instanceof ErrorResponse) {
                             status = handleErrorResponse((ErrorResponse) result);
-                            msg = (Message) result;
+                            msg = (T) result;
                         } else {
-                            msg = (Message) result;
+                            msg = (T) result;
                         }
                         if (done != null) {
                             try {
@@ -203,12 +199,12 @@ public abstract class AbstractClientService implements ClientService {
         } catch (final InterruptedException e) {
             Thread.currentThread().interrupt();
             future.completeExceptionally(e);
-            // should be in another thread to avoid dead locking.
+            // Should be in another thread to avoid deadlocking.
             RpcUtils.runClosureInExecutor(currExecutor, done,
                 new Status(RaftError.EINTR, "Sending rpc was interrupted"));
         } catch (final RemotingException e) {
             future.completeExceptionally(e);
-            // should be in another thread to avoid dead locking.
+            // Should be in another thread to avoid deadlocking.
             RpcUtils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTERNAL,
                 "Fail to send a RPC request:" + e.getMessage()));
 
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientService.java
similarity index 90%
rename from modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java
rename to modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientService.java
index cd0d6ca..1ca44ab 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientService.java
@@ -22,11 +22,12 @@ import org.apache.ignite.raft.closure.RpcResponseClosure;
 import org.apache.ignite.raft.rpc.CliRequests;
 import org.apache.ignite.raft.rpc.Message;
 import org.apache.ignite.raft.rpc.RpcRequests;
+import org.apache.ignite.raft.rpc.RpcRequests.PingRequest;
 
 /**
- * Cli RPC client service.
+ * Raft group RPC client service.
  */
-public interface CliClientService extends ClientService {
+public interface RaftGroupClientService extends ClientService {
     /**
      * Ping a node.
      *
@@ -35,7 +36,7 @@ public interface CliClientService extends ClientService {
      * @param done      callback
      * @return a future with result
      */
-    Future<Message> ping(Endpoint endpoint, RpcRequests.PingRequest request,
+    Future<Message> ping(Endpoint endpoint, PingRequest request,
                             RpcResponseClosure<RpcRequests.ErrorResponse> done);
 
     /**
@@ -161,4 +162,15 @@ public interface CliClientService extends ClientService {
      */
     Future<Message> getPeers(Endpoint endpoint, CliRequests.GetPeersRequest request,
                              RpcResponseClosure<CliRequests.GetPeersResponse> done);
+
+    /**
+     * Send custom request.
+     *
+     * @param endpoint  server address
+     * @param request   request data
+     * @param done      callback
+     * @return a future with result
+     */
+    Future<Message> sendCustom(Endpoint endpoint, Message request,
+                             RpcResponseClosure<Message> done);
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientServiceImpl.java
similarity index 94%
rename from modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java
rename to modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientServiceImpl.java
index f6211ff..cb59429 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientServiceImpl.java
@@ -43,7 +43,7 @@ import org.apache.ignite.raft.rpc.RpcRequests.ErrorResponse;
 /**
  *
  */
-public class CliClientServiceImpl extends AbstractClientService implements CliClientService {
+public class RaftGroupClientServiceImpl extends AbstractClientService implements RaftGroupClientService {
     @Override public Future<Message> ping(Endpoint endpoint, RpcRequests.PingRequest request,
                                           final RpcResponseClosure<ErrorResponse> done) {
         return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
@@ -114,4 +114,8 @@ public class CliClientServiceImpl extends AbstractClientService implements CliCl
                                     final RpcResponseClosure<GetPeersResponse> done) {
         return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
     }
+
+    @Override public Future<Message> sendCustom(Endpoint endpoint, Message request, RpcResponseClosure<Message> done) {
+        return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+    }
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java
index 916a768..65e6f2b 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java
@@ -82,7 +82,7 @@ public class RouteTable {
 
     /**
      * Get the cached leader of the group, return it when found, null otherwise.
-     * Make sure calls {@link #refreshLeader(CliClientService, String, int)} already
+     * Make sure calls {@link #refreshLeader(RaftGroupClientService, String, int)} already
      * before invoke this method.
      *
      * @param groupId raft group id
@@ -174,7 +174,7 @@ public class RouteTable {
      * @param timeoutMs timeout millis
      * @return operation status
      */
-    public Status refreshLeader(final CliClientService cliClientService, final String groupId, final int timeoutMs)
+    public Status refreshLeader(final RaftGroupClientService raftGroupClientService, final String groupId, final int timeoutMs)
         throws InterruptedException,
         TimeoutException {
         final Configuration conf = getConfiguration(groupId);
@@ -188,7 +188,7 @@ public class RouteTable {
         final CliRequests.GetLeaderRequest request = rb.build();
         TimeoutException timeoutException = null;
         for (final PeerId peer : conf) {
-            if (!cliClientService.connect(peer.getEndpoint())) {
+            if (!raftGroupClientService.connect(peer.getEndpoint())) {
                 if (st.isOk()) {
                     st.setError(-1, "Fail to init channel to %s", peer);
                 } else {
@@ -197,7 +197,7 @@ public class RouteTable {
                 }
                 continue;
             }
-            final Future<Message> result = cliClientService.getLeader(peer.getEndpoint(), request, null);
+            final Future<Message> result = raftGroupClientService.getLeader(peer.getEndpoint(), request, null);
             try {
                 final Message msg = result.get(timeoutMs, TimeUnit.MILLISECONDS);
                 if (msg instanceof RpcRequests.ErrorResponse) {
@@ -230,7 +230,7 @@ public class RouteTable {
         return st;
     }
 
-    public Status refreshConfiguration(final CliClientService cliClientService, final String groupId,
+    public Status refreshConfiguration(final RaftGroupClientService raftGroupClientService, final String groupId,
                                        final int timeoutMs) throws InterruptedException, TimeoutException {
         final Configuration conf = getConfiguration(groupId);
         if (conf == null) {
@@ -240,14 +240,14 @@ public class RouteTable {
         final Status st = Status.OK();
         PeerId leaderId = selectLeader(groupId);
         if (leaderId == null) {
-            refreshLeader(cliClientService, groupId, timeoutMs);
+            refreshLeader(raftGroupClientService, groupId, timeoutMs);
             leaderId = selectLeader(groupId);
         }
         if (leaderId == null) {
             st.setError(-1, "Fail to get leader of group %s", groupId);
             return st;
         }
-        if (!cliClientService.connect(leaderId.getEndpoint())) {
+        if (!raftGroupClientService.connect(leaderId.getEndpoint())) {
             st.setError(-1, "Fail to init channel to %s", leaderId);
             return st;
         }
@@ -255,7 +255,7 @@ public class RouteTable {
         rb.setGroupId(groupId);
         rb.setLeaderId(leaderId.toString());
         try {
-            final Message result = cliClientService.getPeers(leaderId.getEndpoint(), rb.build(), null).get(timeoutMs,
+            final Message result = raftGroupClientService.getPeers(leaderId.getEndpoint(), rb.build(), null).get(timeoutMs,
                 TimeUnit.MILLISECONDS);
             if (result instanceof CliRequests.GetPeersResponse) {
                 final CliRequests.GetPeersResponse resp = (CliRequests.GetPeersResponse) result;
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftGroupClientServiceTest.java
similarity index 68%
rename from modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java
rename to modules/raft-client/src/test/java/org/apache/ignite/raft/RaftGroupClientServiceTest.java
index 54404a8..f3257fb 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftGroupClientServiceTest.java
@@ -7,23 +7,30 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.raft.closure.RpcResponseClosureAdapter;
+import org.apache.ignite.raft.rpc.CustomRequest;
+import org.apache.ignite.raft.rpc.CustomResponse;
 import org.apache.ignite.raft.rpc.Message;
 import org.apache.ignite.raft.rpc.MessageBuilderFactory;
 import org.apache.ignite.raft.rpc.RpcOptions;
 import org.apache.ignite.raft.rpc.RpcRequests;
 import org.apache.ignite.raft.rpc.RpcRequests.ErrorResponse;
+import org.apache.ignite.raft.rpc.TestCustomRequestProcessor;
 import org.apache.ignite.raft.rpc.TestGetLeaderRequestProcessor;
 import org.apache.ignite.raft.rpc.TestPingRequestProcessor;
 import org.apache.ignite.raft.rpc.impl.LocalRpcServer;
-import org.apache.ignite.raft.service.CliClientServiceImpl;
+import org.apache.ignite.raft.service.RaftGroupClientServiceImpl;
 import org.apache.ignite.raft.service.RouteTable;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class RaftClientTest {
-    private static final System.Logger LOG = System.getLogger(RaftClientTest.class.getName());
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RaftGroupClientServiceTest {
+    private static final System.Logger LOG = System.getLogger(RaftGroupClientServiceTest.class.getName());
 
     private static final Configuration initConf = new Configuration(Arrays.asList(
         new PeerId("127.0.0.1", 8080),
@@ -39,6 +46,7 @@ public class RaftClientTest {
             LocalRpcServer srv = new LocalRpcServer(peer.getEndpoint());
             srv.registerProcessor(new TestPingRequestProcessor());
             srv.registerProcessor(new TestGetLeaderRequestProcessor());
+            srv.registerProcessor(new TestCustomRequestProcessor());
             srv.init(new RpcOptions());
             srvs.add(srv);
         }
@@ -54,29 +62,42 @@ public class RaftClientTest {
     }
 
     @Test
+    public void testCustom() throws TimeoutException, InterruptedException, ExecutionException {
+        final RaftGroupClientServiceImpl cliClientService = new RaftGroupClientServiceImpl();
+        cliClientService.init(new RpcOptions());
+
+        Future<Message> resp = cliClientService.sendCustom(initConf.getPeers().get(0).getEndpoint(), new CustomRequest(), null);
+
+        assertTrue(resp.get() instanceof CustomResponse);
+    }
+
+    @Test
     public void testPing() throws TimeoutException, InterruptedException, ExecutionException {
         String groupId = "unittest";
 
         RouteTable.getInstance().updateConfiguration(groupId, initConf);
 
-        final CliClientServiceImpl cliClientService = new CliClientServiceImpl();
+        final RaftGroupClientServiceImpl cliClientService = new RaftGroupClientServiceImpl();
         cliClientService.init(new RpcOptions());
 
         RpcRequests.PingRequest.Builder builder = MessageBuilderFactory.DEFAULT.createPingRequest();
         builder.setSendTimestamp(System.currentTimeMillis());
         RpcRequests.PingRequest req = builder.build();
 
+        AtomicReference<Status> ref = new AtomicReference<>();
+
         RpcResponseClosureAdapter<ErrorResponse> done = new RpcResponseClosureAdapter<>() {
             @Override public void run(Status status) {
-                System.out.println();
+                ref.set(status);
             }
         };
 
         Future<Message> resp = cliClientService.ping(initConf.getPeers().get(0).getEndpoint(), req, done);
 
-        Message msg = resp.get();
+        ErrorResponse msg = (ErrorResponse) resp.get();
 
-        System.out.println();
+        assertEquals(RaftError.SUCCESS.getNumber(), msg.getErrorCode());
+        assertTrue(ref.get().isOk());
     }
 
     @Test
@@ -93,10 +114,10 @@ public class RaftClientTest {
 
         RouteTable.getInstance().updateConfiguration(groupId, initConf);
 
-        final CliClientServiceImpl cliClientService = new CliClientServiceImpl();
-        cliClientService.init(new RpcOptions());
+        final RaftGroupClientServiceImpl raftGroupClientService = new RaftGroupClientServiceImpl();
+        raftGroupClientService.init(new RpcOptions());
 
-        if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) {
+        if (!RouteTable.getInstance().refreshLeader(raftGroupClientService, groupId, 1000).isOk()) {
             throw new IllegalStateException("Refresh leader failed");
         }
 
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomRequest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomRequest.java
new file mode 100644
index 0000000..8f53d99
--- /dev/null
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomRequest.java
@@ -0,0 +1,4 @@
+package org.apache.ignite.raft.rpc;
+
+public class CustomRequest implements Message {
+}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomResponse.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomResponse.java
new file mode 100644
index 0000000..b795f2c
--- /dev/null
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomResponse.java
@@ -0,0 +1,4 @@
+package org.apache.ignite.raft.rpc;
+
+public class CustomResponse implements Message {
+}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestCustomRequestProcessor.java
similarity index 60%
copy from modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
copy to modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestCustomRequestProcessor.java
index c9e76c2..a5c64a0 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestCustomRequestProcessor.java
@@ -17,20 +17,17 @@
 package org.apache.ignite.raft.rpc;
 
 
-import org.apache.ignite.raft.rpc.CliRequests.GetLeaderRequest;
-import org.apache.ignite.raft.rpc.RpcRequests.PingRequest;
-
 /**
  * Ping request processor.
  */
-public class TestGetLeaderRequestProcessor implements RpcProcessor<GetLeaderRequest> {
-    @Override public void handleRequest(RpcContext rpcCtx, GetLeaderRequest request) {
-        CliRequests.GetLeaderResponse.Builder resp = MessageBuilderFactory.DEFAULT.createGetLeaderResponse();
-        resp.setLeaderId("127.0.0.1:8081");
-        rpcCtx.sendResponse(resp.build());
+public class TestCustomRequestProcessor implements RpcProcessor<CustomRequest> {
+    @Override
+    public void handleRequest(final RpcContext rpcCtx, final CustomRequest request) {
+        rpcCtx.sendResponse(new CustomResponse());
     }
 
-    @Override public String interest() {
-        return GetLeaderRequest.class.getName();
+    @Override
+    public String interest() {
+        return CustomRequest.class.getName();
     }
 }
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
index c9e76c2..934a2ec 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
@@ -18,10 +18,9 @@ package org.apache.ignite.raft.rpc;
 
 
 import org.apache.ignite.raft.rpc.CliRequests.GetLeaderRequest;
-import org.apache.ignite.raft.rpc.RpcRequests.PingRequest;
 
 /**
- * Ping request processor.
+ * Get leader request processor.
  */
 public class TestGetLeaderRequestProcessor implements RpcProcessor<GetLeaderRequest> {
     @Override public void handleRequest(RpcContext rpcCtx, GetLeaderRequest request) {