You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/10 12:46:00 UTC
[ignite-3] branch ignite-13885 updated: IGNITE-13885 Raft client
wip 3.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch ignite-13885
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-13885 by this push:
new 51438cd IGNITE-13885 Raft client wip 3.
51438cd is described below
commit 51438cdc043f312db77a365b2da201cdd2f6c255
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Wed Feb 10 15:45:49 2021 +0300
IGNITE-13885 Raft client wip 3.
---
.../java/org/apache/ignite/raft/rpc/RpcUtils.java | 9 +++
.../ignite/raft/rpc/message/ErrorResponseImpl.java | 23 ++++---
.../ignite/raft/service/CliClientService.java | 11 ++++
.../ignite/raft/service/CliClientServiceImpl.java | 6 ++
.../apache/ignite/raft/service/ClientService.java | 1 -
.../org/apache/ignite/raft/RaftClientTest.java | 70 +++++++++++++++++++++-
.../raft/rpc/TestGetLeaderRequestProcessor.java | 36 +++++++++++
.../ignite/raft/rpc/TestPingRequestProcessor.java | 37 ++++++++++++
8 files changed, 183 insertions(+), 10 deletions(-)
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcUtils.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcUtils.java
index 9bbaaa3..8e6d896 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcUtils.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/RpcUtils.java
@@ -106,6 +106,15 @@ public final class RpcUtils {
});
}
+ public static Message newResponse(final int code, final String fmt, final Object... args) {
+ final RpcRequests.ErrorResponse.Builder eBuilder = RpcRequests.ErrorResponse.newBuilder();
+ eBuilder.setErrorCode(code);
+ if (fmt != null) {
+ eBuilder.setErrorMsg(String.format(fmt, args));
+ }
+ return eBuilder.build();
+ }
+
private RpcUtils() {
}
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ErrorResponseImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ErrorResponseImpl.java
index ad1f2b7..e29c7ea 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ErrorResponseImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/message/ErrorResponseImpl.java
@@ -3,23 +3,30 @@ package org.apache.ignite.raft.rpc.message;
import org.apache.ignite.raft.rpc.RpcRequests;
class ErrorResponseImpl implements RpcRequests.ErrorResponse, RpcRequests.ErrorResponse.Builder {
+ private int errorCode;
+ private String errorMsg = "";
+
@Override public int getErrorCode() {
- return 0;
+ return errorCode;
}
- @Override public String getErrorMsg() {
- return null;
+ @Override public Builder setErrorCode(int errorCode) {
+ this.errorCode = errorCode;
+
+ return this;
}
- @Override public Builder setErrorCode(int code) {
- return null;
+ @Override public String getErrorMsg() {
+ return errorMsg;
}
- @Override public Builder setErrorMsg(String msg) {
- return null;
+ @Override public Builder setErrorMsg(String errorMsg) {
+ this.errorMsg = errorMsg;
+
+ return this;
}
@Override public RpcRequests.ErrorResponse build() {
- return null;
+ return this;
}
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java
index 8082191..cd0d6ca 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java
@@ -28,6 +28,17 @@ import org.apache.ignite.raft.rpc.RpcRequests;
*/
public interface CliClientService extends ClientService {
/**
+ * Ping a node.
+ *
+ * @param endpoint server address
+ * @param request request data
+ * @param done callback
+ * @return a future with result
+ */
+ Future<Message> ping(Endpoint endpoint, RpcRequests.PingRequest request,
+ RpcResponseClosure<RpcRequests.ErrorResponse> done);
+
+ /**
* Adds a peer.
*
* @param endpoint server address
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java
index 94ec3a2..f6211ff 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java
@@ -37,12 +37,18 @@ import org.apache.ignite.raft.rpc.CliRequests.ResetLearnersRequest;
import org.apache.ignite.raft.rpc.CliRequests.ResetPeerRequest;
import org.apache.ignite.raft.rpc.CliRequests.TransferLeaderRequest;
import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.RpcRequests;
import org.apache.ignite.raft.rpc.RpcRequests.ErrorResponse;
/**
*
*/
public class CliClientServiceImpl extends AbstractClientService implements CliClientService {
+ @Override public Future<Message> ping(Endpoint endpoint, RpcRequests.PingRequest request,
+ final RpcResponseClosure<ErrorResponse> done) {
+ return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout());
+ }
+
@Override
public Future<Message> addPeer(final Endpoint endpoint, final AddPeerRequest request,
final RpcResponseClosure<AddPeerResponse> done) {
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/ClientService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/ClientService.java
index ef4976d..7f6482c 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/ClientService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/ClientService.java
@@ -25,7 +25,6 @@ import org.apache.ignite.raft.rpc.RpcOptions;
/**
* RPC client service.
- * TODO not needed
*/
public interface ClientService extends Lifecycle<RpcOptions> {
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java
index 0b1e8e4..54404a8 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java
@@ -1,18 +1,86 @@
package org.apache.ignite.raft;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
+import org.apache.ignite.raft.closure.RpcResponseClosureAdapter;
+import org.apache.ignite.raft.rpc.Message;
+import org.apache.ignite.raft.rpc.MessageBuilderFactory;
import org.apache.ignite.raft.rpc.RpcOptions;
+import org.apache.ignite.raft.rpc.RpcRequests;
+import org.apache.ignite.raft.rpc.RpcRequests.ErrorResponse;
+import org.apache.ignite.raft.rpc.TestGetLeaderRequestProcessor;
+import org.apache.ignite.raft.rpc.TestPingRequestProcessor;
+import org.apache.ignite.raft.rpc.impl.LocalRpcServer;
import org.apache.ignite.raft.service.CliClientServiceImpl;
import org.apache.ignite.raft.service.RouteTable;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
public class RaftClientTest {
private static final System.Logger LOG = System.getLogger(RaftClientTest.class.getName());
+ private static final Configuration initConf = new Configuration(Arrays.asList(
+ new PeerId("127.0.0.1", 8080),
+ new PeerId("127.0.0.1", 8081),
+ new PeerId("127.0.0.1", 8082)
+ ));
+
+ private Set<LocalRpcServer> srvs = new HashSet<>();
+
+ @Before
+ public void beforeTest() {
+ for (PeerId peer : initConf.getPeers()) {
+ LocalRpcServer srv = new LocalRpcServer(peer.getEndpoint());
+ srv.registerProcessor(new TestPingRequestProcessor());
+ srv.registerProcessor(new TestGetLeaderRequestProcessor());
+ srv.init(new RpcOptions());
+ srvs.add(srv);
+ }
+ }
+
+ @After
+ public void afterTest() {
+ for (LocalRpcServer srv : srvs) {
+ srv.shutdown();
+ }
+
+ srvs.clear();
+ }
+
+ @Test
+ public void testPing() throws TimeoutException, InterruptedException, ExecutionException {
+ String groupId = "unittest";
+
+ RouteTable.getInstance().updateConfiguration(groupId, initConf);
+
+ final CliClientServiceImpl cliClientService = new CliClientServiceImpl();
+ cliClientService.init(new RpcOptions());
+
+ RpcRequests.PingRequest.Builder builder = MessageBuilderFactory.DEFAULT.createPingRequest();
+ builder.setSendTimestamp(System.currentTimeMillis());
+ RpcRequests.PingRequest req = builder.build();
+
+ RpcResponseClosureAdapter<ErrorResponse> done = new RpcResponseClosureAdapter<>() {
+ @Override public void run(Status status) {
+ System.out.println();
+ }
+ };
+
+ Future<Message> resp = cliClientService.ping(initConf.getPeers().get(0).getEndpoint(), req, done);
+
+ Message msg = resp.get();
+
+ System.out.println();
+ }
+
@Test
- public void testClient() throws TimeoutException, InterruptedException {
+ public void testRefreshLeader() throws TimeoutException, InterruptedException {
List<PeerId> peers = Arrays.asList(
new PeerId("127.0.0.1", 8080),
new PeerId("127.0.0.1", 8081),
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
new file mode 100644
index 0000000..c9e76c2
--- /dev/null
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+
+import org.apache.ignite.raft.rpc.CliRequests.GetLeaderRequest;
+import org.apache.ignite.raft.rpc.RpcRequests.PingRequest;
+
+/**
+ * Ping request processor.
+ */
+public class TestGetLeaderRequestProcessor implements RpcProcessor<GetLeaderRequest> {
+ @Override public void handleRequest(RpcContext rpcCtx, GetLeaderRequest request) {
+ CliRequests.GetLeaderResponse.Builder resp = MessageBuilderFactory.DEFAULT.createGetLeaderResponse();
+ resp.setLeaderId("127.0.0.1:8081");
+ rpcCtx.sendResponse(resp.build());
+ }
+
+ @Override public String interest() {
+ return GetLeaderRequest.class.getName();
+ }
+}
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestPingRequestProcessor.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestPingRequestProcessor.java
new file mode 100644
index 0000000..af558c5
--- /dev/null
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestPingRequestProcessor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.rpc;
+
+
+import org.apache.ignite.raft.rpc.RpcRequests.PingRequest;
+
+/**
+ * Ping request processor.
+ */
+public class TestPingRequestProcessor implements RpcProcessor<PingRequest> {
+ @Override
+ public void handleRequest(final RpcContext rpcCtx, final PingRequest request) {
+ Message resp = RpcUtils.newResponse(0, "OK");
+
+ rpcCtx.sendResponse(resp);
+ }
+
+ @Override
+ public String interest() {
+ return PingRequest.class.getName();
+ }
+}