You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/10 12:46:00 UTC

[ignite-3] branch ignite-13885 updated: IGNITE-13885 Raft client wip 3.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch ignite-13885
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-13885 by this push:
     new 51438cd  IGNITE-13885 Raft client wip 3.
51438cd is described below

commit 51438cdc043f312db77a365b2da201cdd2f6c255
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Wed Feb 10 15:45:49 2021 +0300

    IGNITE-13885 Raft client wip 3.
---
 .../java/org/apache/ignite/raft/rpc/RpcUtils.java  |  9 +++
 .../ignite/raft/rpc/message/ErrorResponseImpl.java | 23 ++++---
 .../ignite/raft/service/CliClientService.java      | 11 ++++
 .../ignite/raft/service/CliClientServiceImpl.java  |  6 ++
 .../apache/ignite/raft/service/ClientService.java  |  1 -
 .../org/apache/ignite/raft/RaftClientTest.java     | 70 +++++++++++++++++++++-
 .../raft/rpc/TestGetLeaderRequestProcessor.java    | 36 +++++++++++
 .../ignite/raft/rpc/TestPingRequestProcessor.java  | 37 ++++++++++++
 8 files changed, 183 insertions(+), 10 deletions(-)

diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcUtils.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcUtils.java
index 9bbaaa3..8e6d896 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcUtils.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcUtils.java
@@ -106,6 +106,15 @@ public final class RpcUtils {
         });
     }
 
+    public static Message newResponse(final int code, final String fmt, final Object... args) {
+        final RpcRequests.ErrorResponse.Builder eBuilder = RpcRequests.ErrorResponse.newBuilder();
+        eBuilder.setErrorCode(code);
+        if (fmt != null) {
+            eBuilder.setErrorMsg(String.format(fmt, args));
+        }
+        return eBuilder.build();
+    }
+
     private RpcUtils() {
     }
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ErrorResponseImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ErrorResponseImpl.java
index ad1f2b7..e29c7ea 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ErrorResponseImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ErrorResponseImpl.java
@@ -3,23 +3,30 @@ package org.apache.ignite.raft.rpc.message;
 import org.apache.ignite.raft.rpc.RpcRequests;
 
 class ErrorResponseImpl implements RpcRequests.ErrorResponse, RpcRequests.ErrorResponse.Builder {
+    private int errorCode;
+    private String errorMsg = "";
+
     @Override public int getErrorCode() {
-        return 0;
+        return errorCode;
     }
 
-    @Override public String getErrorMsg() {
-        return null;
+    @Override public Builder setErrorCode(int errorCode) {
+        this.errorCode = errorCode;
+
+        return this;
     }
 
-    @Override public Builder setErrorCode(int code) {
-        return null;
+    @Override public String getErrorMsg() {
+        return errorMsg;
     }
 
-    @Override public Builder setErrorMsg(String msg) {
-        return null;
+    @Override public Builder setErrorMsg(String errorMsg) {
+        this.errorMsg = errorMsg;
+
+        return this;
     }
 
     @Override public RpcRequests.ErrorResponse build() {
-        return null;
+        return this;
     }
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java
index 8082191..cd0d6ca 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java
@@ -28,6 +28,17 @@ import org.apache.ignite.raft.rpc.RpcRequests;
  */
 public interface CliClientService extends ClientService {
     /**
+     * Ping a node.
+     *
+     * @param endpoint  server address
+     * @param request   request data
+     * @param done      callback
+     * @return a future with result
+     */
+    Future<Message> ping(Endpoint endpoint, RpcRequests.PingRequest request,
+                            RpcResponseClosure<RpcRequests.ErrorResponse> done);
+
+    /**
      * Adds a peer.
      *
      * @param endpoint  server address
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java
index 94ec3a2..f6211ff 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java
@@ -37,12 +37,18 @@ import org.apache.ignite.raft.rpc.CliRequests.ResetLearnersRequest;
 import org.apache.ignite.raft.rpc.CliRequests.ResetPeerRequest;
 import org.apache.ignite.raft.rpc.CliRequests.TransferLeaderRequest;
 import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.RpcRequests;
 import org.apache.ignite.raft.rpc.RpcRequests.ErrorResponse;
 
 /**
  *
  */
 public class CliClientServiceImpl extends AbstractClientService implements CliClientService {
+    @Override public Future<Message> ping(Endpoint endpoint, RpcRequests.PingRequest request,
+                                          final RpcResponseClosure<ErrorResponse> done) {
+        return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+    }
+
     @Override
     public Future<Message> addPeer(final Endpoint endpoint, final AddPeerRequest request,
                                    final RpcResponseClosure<AddPeerResponse> done) {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/ClientService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/ClientService.java
index ef4976d..7f6482c 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/ClientService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/ClientService.java
@@ -25,7 +25,6 @@ import org.apache.ignite.raft.rpc.RpcOptions;
 
 /**
  * RPC client service.
- * TODO not needed
  */
 public interface ClientService extends Lifecycle<RpcOptions> {
 
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java
index 0b1e8e4..54404a8 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java
@@ -1,18 +1,86 @@
 package org.apache.ignite.raft;
 
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
+import org.apache.ignite.raft.closure.RpcResponseClosureAdapter;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.MessageBuilderFactory;
 import org.apache.ignite.raft.rpc.RpcOptions;
+import org.apache.ignite.raft.rpc.RpcRequests;
+import org.apache.ignite.raft.rpc.RpcRequests.ErrorResponse;
+import org.apache.ignite.raft.rpc.TestGetLeaderRequestProcessor;
+import org.apache.ignite.raft.rpc.TestPingRequestProcessor;
+import org.apache.ignite.raft.rpc.impl.LocalRpcServer;
 import org.apache.ignite.raft.service.CliClientServiceImpl;
 import org.apache.ignite.raft.service.RouteTable;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 public class RaftClientTest {
     private static final System.Logger LOG = System.getLogger(RaftClientTest.class.getName());
 
+    private static final Configuration initConf = new Configuration(Arrays.asList(
+        new PeerId("127.0.0.1", 8080),
+        new PeerId("127.0.0.1", 8081),
+        new PeerId("127.0.0.1", 8082)
+    ));
+
+    private Set<LocalRpcServer> srvs = new HashSet<>();
+
+    @Before
+    public void beforeTest() {
+        for (PeerId peer : initConf.getPeers()) {
+            LocalRpcServer srv = new LocalRpcServer(peer.getEndpoint());
+            srv.registerProcessor(new TestPingRequestProcessor());
+            srv.registerProcessor(new TestGetLeaderRequestProcessor());
+            srv.init(new RpcOptions());
+            srvs.add(srv);
+        }
+    }
+
+    @After
+    public void afterTest() {
+        for (LocalRpcServer srv : srvs) {
+            srv.shutdown();
+        }
+
+        srvs.clear();
+    }
+
+    @Test
+    public void testPing() throws TimeoutException, InterruptedException, ExecutionException {
+        String groupId = "unittest";
+
+        RouteTable.getInstance().updateConfiguration(groupId, initConf);
+
+        final CliClientServiceImpl cliClientService = new CliClientServiceImpl();
+        cliClientService.init(new RpcOptions());
+
+        RpcRequests.PingRequest.Builder builder = MessageBuilderFactory.DEFAULT.createPingRequest();
+        builder.setSendTimestamp(System.currentTimeMillis());
+        RpcRequests.PingRequest req = builder.build();
+
+        RpcResponseClosureAdapter<ErrorResponse> done = new RpcResponseClosureAdapter<>() {
+            @Override public void run(Status status) {
+                System.out.println();
+            }
+        };
+
+        Future<Message> resp = cliClientService.ping(initConf.getPeers().get(0).getEndpoint(), req, done);
+
+        Message msg = resp.get();
+
+        System.out.println();
+    }
+
     @Test
-    public void testClient() throws TimeoutException, InterruptedException {
+    public void testRefreshLeader() throws TimeoutException, InterruptedException {
         List<PeerId> peers = Arrays.asList(
             new PeerId("127.0.0.1", 8080),
             new PeerId("127.0.0.1", 8081),
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
new file mode 100644
index 0000000..c9e76c2
--- /dev/null
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+
+import org.apache.ignite.raft.rpc.CliRequests.GetLeaderRequest;
+import org.apache.ignite.raft.rpc.RpcRequests.PingRequest;
+
+/**
+ * Ping request processor.
+ */
+public class TestGetLeaderRequestProcessor implements RpcProcessor<GetLeaderRequest> {
+    @Override public void handleRequest(RpcContext rpcCtx, GetLeaderRequest request) {
+        CliRequests.GetLeaderResponse.Builder resp = MessageBuilderFactory.DEFAULT.createGetLeaderResponse();
+        resp.setLeaderId("127.0.0.1:8081");
+        rpcCtx.sendResponse(resp.build());
+    }
+
+    @Override public String interest() {
+        return GetLeaderRequest.class.getName();
+    }
+}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestPingRequestProcessor.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestPingRequestProcessor.java
new file mode 100644
index 0000000..af558c5
--- /dev/null
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestPingRequestProcessor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+
+import org.apache.ignite.raft.rpc.RpcRequests.PingRequest;
+
+/**
+ * Ping request processor.
+ */
+public class TestPingRequestProcessor implements RpcProcessor<PingRequest> {
+    @Override
+    public void handleRequest(final RpcContext rpcCtx, final PingRequest request) {
+        Message resp = RpcUtils.newResponse(0, "OK");
+
+        rpcCtx.sendResponse(resp);
+    }
+
+    @Override
+    public String interest() {
+        return PingRequest.class.getName();
+    }
+}