You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/06/16 09:13:35 UTC
[ignite-3] branch main updated: IGNITE-14909 Use local address
instead of external in AbstractRpcTest (#177)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d1c4f34 IGNITE-14909 Use local address instead of external in AbstractRpcTest (#177)
d1c4f34 is described below
commit d1c4f340c4deb9fd3929280db6e5119a18bf8c56
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Wed Jun 16 12:13:29 2021 +0300
IGNITE-14909 Use local address instead of external in AbstractRpcTest (#177)
---
.../apache/ignite/raft/jraft/rpc/RpcClient.java | 22 ++--
.../raft/jraft/rpc/impl/IgniteRpcClient.java | 12 ++
.../ignite/raft/jraft/rpc/AbstractRpcTest.java | 139 ++++++++++-----------
.../ignite/raft/jraft/rpc/IgniteRpcTest.java | 14 +--
4 files changed, 96 insertions(+), 91 deletions(-)
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
index cb8530c..5e473c8 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
@@ -21,26 +21,26 @@ import org.apache.ignite.raft.jraft.Lifecycle;
import org.apache.ignite.raft.jraft.error.RemotingException;
import org.apache.ignite.raft.jraft.option.RpcOptions;
import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.jetbrains.annotations.Nullable;
/**
*
*/
public interface RpcClient extends Lifecycle<RpcOptions> {
-
/**
* Check connection for given address. // TODO asch rename to isAlive.
*
* @param endpoint target address
* @return true if there is a connection and the connection is active and writable.
*/
- boolean checkConnection(final Endpoint endpoint);
+ boolean checkConnection(Endpoint endpoint);
/**
* Register a connect event listener for the handler.
*
* @param handler The handler.
*/
- void registerConnectEventListener(final TopologyEventHandler handler);
+ void registerConnectEventListener(TopologyEventHandler handler);
/**
* Synchronous invocation.
@@ -50,7 +50,7 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
* @param timeoutMs timeout millisecond
* @return invoke result
*/
- default Object invokeSync(final Endpoint endpoint, final Object request, final long timeoutMs)
+ default Object invokeSync(Endpoint endpoint, Object request, long timeoutMs)
throws InterruptedException, RemotingException {
return invokeSync(endpoint, request, null, timeoutMs);
}
@@ -64,8 +64,8 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
* @param timeoutMs timeout millisecond
* @return invoke result
*/
- Object invokeSync(final Endpoint endpoint, final Object request, final InvokeContext ctx,
- final long timeoutMs) throws InterruptedException, RemotingException;
+ Object invokeSync(Endpoint endpoint, Object request, @Nullable InvokeContext ctx,
+ long timeoutMs) throws InterruptedException, RemotingException;
/**
* Asynchronous invocation with a callback.
@@ -75,8 +75,8 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
* @param callback invoke callback
* @param timeoutMs timeout millisecond
*/
- default void invokeAsync(final Endpoint endpoint, final Object request, final InvokeCallback callback,
- final long timeoutMs) throws InterruptedException, RemotingException {
+ default void invokeAsync(Endpoint endpoint, Object request, InvokeCallback callback,
+ long timeoutMs) throws InterruptedException, RemotingException {
invokeAsync(endpoint, request, null, callback, timeoutMs);
}
@@ -89,7 +89,7 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
* @param callback invoke callback
* @param timeoutMs timeout millisecond
*/
- void invokeAsync(final Endpoint endpoint, final Object request, final InvokeContext ctx,
- final InvokeCallback callback,
- final long timeoutMs) throws InterruptedException, RemotingException;
+ void invokeAsync(Endpoint endpoint, Object request, @Nullable InvokeContext ctx,
+ InvokeCallback callback,
+ long timeoutMs) throws InterruptedException, RemotingException;
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
index 9d4c890..f0d0495 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
@@ -65,14 +65,17 @@ public class IgniteRpcClient implements RpcClientEx {
return service;
}
+ /** {@inheritDoc} */
@Override public boolean checkConnection(Endpoint endpoint) {
return service.topologyService().getByAddress(endpoint.toString()) != null;
}
+ /** {@inheritDoc} */
@Override public void registerConnectEventListener(TopologyEventHandler handler) {
service.topologyService().addEventHandler(handler);
}
+ /** {@inheritDoc} */
@Override public Object invokeSync(Endpoint endpoint, Object request, InvokeContext ctx,
long timeoutMs) throws InterruptedException, RemotingException {
if (!checkConnection(endpoint))
@@ -112,6 +115,7 @@ public class IgniteRpcClient implements RpcClientEx {
}
}
+ /** {@inheritDoc} */
@Override public void invokeAsync(Endpoint endpoint, Object request, InvokeContext ctx, InvokeCallback callback,
long timeoutMs) throws InterruptedException, RemotingException {
if (!checkConnection(endpoint))
@@ -170,10 +174,12 @@ public class IgniteRpcClient implements RpcClientEx {
});
}
+ /** {@inheritDoc} */
@Override public boolean init(RpcOptions opts) {
return true;
}
+ /** {@inheritDoc} */
@Override public void shutdown() {
try {
if (!reuse)
@@ -184,10 +190,12 @@ public class IgniteRpcClient implements RpcClientEx {
}
}
+ /** {@inheritDoc} */
@Override public void blockMessages(BiPredicate<Object, String> predicate) {
this.blockPred = predicate;
}
+ /** {@inheritDoc} */
@Override public void stopBlock() {
ArrayList<Object[]> msgs = new ArrayList<>();
@@ -204,18 +212,22 @@ public class IgniteRpcClient implements RpcClientEx {
}
}
+ /** {@inheritDoc} */
@Override public void recordMessages(BiPredicate<Object, String> predicate) {
this.recordPred = predicate;
}
+ /** {@inheritDoc} */
@Override public void stopRecord() {
this.recordPred = null;
}
+ /** {@inheritDoc} */
@Override public Queue<Object[]> recordedMessages() {
return recordedMsgs;
}
+ /** {@inheritDoc} */
@Override public Queue<Object[]> blockedMessages() {
return blockedMsgs;
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
index bad806c..278c03d 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.ignite.raft.jraft.rpc;
+import java.net.Inet4Address;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
@@ -26,32 +28,34 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.raft.jraft.error.RemotingException;
-import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.Utils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import static org.apache.ignite.raft.jraft.test.TestUtils.INIT_PORT;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
/**
*
*/
public abstract class AbstractRpcTest {
protected Endpoint endpoint;
- protected List<RpcServer> servers = new ArrayList<>();
- @Before
- public void setup() {
- endpoint = new Endpoint(TestUtils.getMyIp(), INIT_PORT);
- RpcServer server = createServer(endpoint);
+ private final List<RpcServer<?>> servers = new ArrayList<>();
+
+ private final List<RpcClient> clients = new ArrayList<>();
+
+ @BeforeEach
+ public void setup() throws UnknownHostException {
+ endpoint = new Endpoint(Inet4Address.getLocalHost().getHostAddress(), INIT_PORT);
+
+ RpcServer<?> server = createServer(endpoint);
server.registerProcessor(new Request1RpcProcessor());
server.registerProcessor(new Request2RpcProcessor());
server.init(null);
@@ -59,47 +63,50 @@ public abstract class AbstractRpcTest {
servers.add(server);
}
- @After
- public void teardown() {
- for (RpcServer server : servers) {
- server.shutdown();
- }
+ @AfterEach
+ public void tearDown() {
+ clients.forEach(RpcClient::shutdown);
+ servers.forEach(RpcServer::shutdown);
}
/**
* @param endpoint The endpoint.
* @return The server.
*/
- public abstract RpcServer createServer(Endpoint endpoint);
+ public abstract RpcServer<?> createServer(Endpoint endpoint);
/**
* @return The client.
*/
- public abstract RpcClient createClient();
+ private RpcClient createClient() {
+ RpcClient client = createClient0();
+
+ clients.add(client);
+
+ return client;
+ }
+
+ public abstract RpcClient createClient0();
@Test
public void testConnection() {
RpcClient client = createClient();
assertTrue(client.checkConnection(endpoint));
-
- client.shutdown();
}
@Test
- public void testSyncProcessing() throws RemotingException, InterruptedException {
+ public void testSyncProcessing() throws Exception {
RpcClient client = createClient();
Response1 resp1 = (Response1) client.invokeSync(endpoint, new Request1(), new InvokeContext(), 5000);
assertNotNull(resp1);
Response2 resp2 = (Response2) client.invokeSync(endpoint, new Request2(), new InvokeContext(), 5000);
assertNotNull(resp2);
-
- client.shutdown();
}
@Test
- public void testAsyncProcessing() throws RemotingException, InterruptedException {
+ public void testAsyncProcessing() throws Exception {
RpcClient client = createClient();
CountDownLatch l1 = new CountDownLatch(1);
@@ -119,35 +126,27 @@ public abstract class AbstractRpcTest {
}, 5000);
l2.await(5_000, TimeUnit.MILLISECONDS);
assertNotNull(resp2);
-
- client.shutdown();
}
@Test
- public void testDisconnect() throws Exception {
+ public void testDisconnect() {
RpcClient client1 = createClient();
RpcClient client2 = createClient();
- try {
- assertTrue(client1.checkConnection(endpoint));
- assertTrue(client2.checkConnection(endpoint));
+ assertTrue(client1.checkConnection(endpoint));
+ assertTrue(client2.checkConnection(endpoint));
- servers.get(0).shutdown();
+ servers.get(0).shutdown();
- assertTrue(waitForTopology(client1, 2, 5_000));
- assertTrue(waitForTopology(client2, 2, 5_000));
+ assertTrue(waitForTopology(client1, 2, 5_000));
+ assertTrue(waitForTopology(client2, 2, 5_000));
- assertFalse(client1.checkConnection(endpoint));
- assertFalse(client2.checkConnection(endpoint));
- }
- finally {
- client1.shutdown();
- client2.shutdown();
- }
+ assertFalse(client1.checkConnection(endpoint));
+ assertFalse(client2.checkConnection(endpoint));
}
@Test
- public void testRecordedSync() throws RemotingException, InterruptedException {
+ public void testRecordedSync() throws Exception {
RpcClientEx client1 = (RpcClientEx) createClient();
client1.recordMessages((a, b) -> true);
@@ -166,12 +165,10 @@ public abstract class AbstractRpcTest {
assertTrue(recorded.poll()[0] instanceof Response1);
assertTrue(recorded.poll()[0] instanceof Request2);
assertTrue(recorded.poll()[0] instanceof Response2);
-
- client1.shutdown();
}
@Test
- public void testRecordedSyncTimeout() throws RemotingException, InterruptedException {
+ public void testRecordedSyncTimeout() {
RpcClientEx client1 = (RpcClientEx) createClient();
client1.recordMessages((a, b) -> true);
@@ -184,7 +181,7 @@ public abstract class AbstractRpcTest {
fail();
}
- catch (Exception e) {
+ catch (Exception ignored) {
// Expected.
}
@@ -192,12 +189,10 @@ public abstract class AbstractRpcTest {
assertEquals(1, recorded.size());
assertTrue(recorded.poll()[0] instanceof Request1);
-
- client1.shutdown();
}
@Test
- public void testRecordedAsync() throws RemotingException, InterruptedException {
+ public void testRecordedAsync() throws Exception {
RpcClientEx client1 = (RpcClientEx) createClient();
client1.recordMessages((a, b) -> true);
@@ -213,12 +208,10 @@ public abstract class AbstractRpcTest {
Queue<Object[]> recorded = client1.recordedMessages();
assertEquals(4, recorded.size());
-
- client1.shutdown();
}
@Test
- public void testRecordedAsyncTimeout() throws RemotingException, InterruptedException {
+ public void testRecordedAsyncTimeout() {
RpcClientEx client1 = (RpcClientEx) createClient();
client1.recordMessages((a, b) -> true);
@@ -227,7 +220,7 @@ public abstract class AbstractRpcTest {
try {
Request1 request = new Request1();
request.val = 10_000;
- CompletableFuture fut = new CompletableFuture<>();
+ CompletableFuture<Object> fut = new CompletableFuture<>();
client1.invokeAsync(endpoint, request, null, (result, err) -> {
if (err == null)
@@ -240,7 +233,7 @@ public abstract class AbstractRpcTest {
fail();
}
- catch (Exception e) {
+ catch (Exception ignored) {
// Expected.
}
@@ -248,8 +241,6 @@ public abstract class AbstractRpcTest {
assertEquals(1, recorded.size());
assertTrue(recorded.poll()[0] instanceof Request1);
-
- client1.shutdown();
}
@Test
@@ -279,8 +270,6 @@ public abstract class AbstractRpcTest {
client1.stopBlock();
resp.get(5_000, TimeUnit.MILLISECONDS);
-
- client1.shutdown();
}
@Test
@@ -290,13 +279,9 @@ public abstract class AbstractRpcTest {
assertTrue(client1.checkConnection(endpoint));
- CompletableFuture resp = new CompletableFuture();
+ CompletableFuture<Object> resp = new CompletableFuture<>();
- client1.invokeAsync(endpoint, new Request1(), new InvokeCallback() {
- @Override public void complete(Object result, Throwable err) {
- resp.complete(result);
- }
- }, 30_000);
+ client1.invokeAsync(endpoint, new Request1(), (result, err) -> resp.complete(result), 30_000);
Thread.sleep(500);
@@ -309,11 +294,11 @@ public abstract class AbstractRpcTest {
client1.stopBlock();
resp.get(5_000, TimeUnit.MILLISECONDS);
-
- client1.shutdown();
}
- protected static class Request1RpcProcessor implements RpcProcessor<Request1> {
+ /** */
+ private static class Request1RpcProcessor implements RpcProcessor<Request1> {
+ /** {@inheritDoc} */
@Override public void handleRequest(RpcContext rpcCtx, Request1 request) {
if (request.val == 10_000)
try {
@@ -328,36 +313,48 @@ public abstract class AbstractRpcTest {
rpcCtx.sendResponse(resp1);
}
+ /** {@inheritDoc} */
@Override public String interest() {
return Request1.class.getName();
}
}
- protected static class Request2RpcProcessor implements RpcProcessor<Request2> {
+ /** */
+ private static class Request2RpcProcessor implements RpcProcessor<Request2> {
+ /** {@inheritDoc} */
@Override public void handleRequest(RpcContext rpcCtx, Request2 request) {
Response2 resp2 = new Response2();
resp2.val = request.val + 1;
rpcCtx.sendResponse(resp2);
}
+ /** {@inheritDoc} */
@Override public String interest() {
return Request2.class.getName();
}
}
+ /** */
private static class Request1 implements Message {
+ /** */
int val;
}
+ /** */
private static class Request2 implements Message {
+ /** */
int val;
}
+ /** */
private static class Response1 implements Message {
+ /** */
int val;
}
+ /** */
private static class Response2 implements Message {
+ /** */
int val;
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
index 0ddffc0..52f1ec8 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.raft.jraft.rpc;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
@@ -34,23 +33,20 @@ import org.apache.ignite.raft.jraft.util.Endpoint;
*/
public class IgniteRpcTest extends AbstractRpcTest {
/**
- * The logger.
- */
- private static final IgniteLogger LOG = IgniteLogger.forClass(IgniteRpcTest.class);
-
- /**
* Serialization registry.
*/
private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
/** The counter. */
- private AtomicInteger cntr = new AtomicInteger();
+ private final AtomicInteger cntr = new AtomicInteger();
- @Override public RpcServer createServer(Endpoint endpoint) {
+ /** {@inheritDoc} */
+ @Override public RpcServer<?> createServer(Endpoint endpoint) {
return new TestIgniteRpcServer(endpoint, new NodeManager());
}
- @Override public RpcClient createClient() {
+ /** {@inheritDoc} */
+ @Override public RpcClient createClient0() {
int i = cntr.incrementAndGet();
ClusterService service = createService("client" + i, endpoint.getPort() - i, List.of(endpoint.toString()));