You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/06/16 09:13:35 UTC

[ignite-3] branch main updated: IGNITE-14909 Use local address instead of external in AbstractRpcTest (#177)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d1c4f34  IGNITE-14909 Use local address instead of external in AbstractRpcTest (#177)
d1c4f34 is described below

commit d1c4f340c4deb9fd3929280db6e5119a18bf8c56
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Wed Jun 16 12:13:29 2021 +0300

    IGNITE-14909 Use local address instead of external in AbstractRpcTest (#177)
---
 .../apache/ignite/raft/jraft/rpc/RpcClient.java    |  22 ++--
 .../raft/jraft/rpc/impl/IgniteRpcClient.java       |  12 ++
 .../ignite/raft/jraft/rpc/AbstractRpcTest.java     | 139 ++++++++++-----------
 .../ignite/raft/jraft/rpc/IgniteRpcTest.java       |  14 +--
 4 files changed, 96 insertions(+), 91 deletions(-)

diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
index cb8530c..5e473c8 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcClient.java
@@ -21,26 +21,26 @@ import org.apache.ignite.raft.jraft.Lifecycle;
 import org.apache.ignite.raft.jraft.error.RemotingException;
 import org.apache.ignite.raft.jraft.option.RpcOptions;
 import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
 public interface RpcClient extends Lifecycle<RpcOptions> {
-
     /**
      * Check connection for given address. // TODO asch rename to isAlive.
      *
      * @param endpoint target address
      * @return true if there is a connection and the connection is active and writable.
      */
-    boolean checkConnection(final Endpoint endpoint);
+    boolean checkConnection(Endpoint endpoint);
 
     /**
      * Register a connect event listener for the handler.
      *
      * @param handler The handler.
      */
-    void registerConnectEventListener(final TopologyEventHandler handler);
+    void registerConnectEventListener(TopologyEventHandler handler);
 
     /**
      * Synchronous invocation.
@@ -50,7 +50,7 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
      * @param timeoutMs timeout millisecond
      * @return invoke result
      */
-    default Object invokeSync(final Endpoint endpoint, final Object request, final long timeoutMs)
+    default Object invokeSync(Endpoint endpoint, Object request, long timeoutMs)
         throws InterruptedException, RemotingException {
         return invokeSync(endpoint, request, null, timeoutMs);
     }
@@ -64,8 +64,8 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
      * @param timeoutMs timeout millisecond
      * @return invoke result
      */
-    Object invokeSync(final Endpoint endpoint, final Object request, final InvokeContext ctx,
-        final long timeoutMs) throws InterruptedException, RemotingException;
+    Object invokeSync(Endpoint endpoint, Object request, @Nullable InvokeContext ctx,
+        long timeoutMs) throws InterruptedException, RemotingException;
 
     /**
      * Asynchronous invocation with a callback.
@@ -75,8 +75,8 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
      * @param callback invoke callback
      * @param timeoutMs timeout millisecond
      */
-    default void invokeAsync(final Endpoint endpoint, final Object request, final InvokeCallback callback,
-        final long timeoutMs) throws InterruptedException, RemotingException {
+    default void invokeAsync(Endpoint endpoint, Object request, InvokeCallback callback,
+        long timeoutMs) throws InterruptedException, RemotingException {
         invokeAsync(endpoint, request, null, callback, timeoutMs);
     }
 
@@ -89,7 +89,7 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
      * @param callback invoke callback
      * @param timeoutMs timeout millisecond
      */
-    void invokeAsync(final Endpoint endpoint, final Object request, final InvokeContext ctx,
-        final InvokeCallback callback,
-        final long timeoutMs) throws InterruptedException, RemotingException;
+    void invokeAsync(Endpoint endpoint, Object request, @Nullable InvokeContext ctx,
+        InvokeCallback callback,
+        long timeoutMs) throws InterruptedException, RemotingException;
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
index 9d4c890..f0d0495 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
@@ -65,14 +65,17 @@ public class IgniteRpcClient implements RpcClientEx {
         return service;
     }
 
+    /** {@inheritDoc} */
     @Override public boolean checkConnection(Endpoint endpoint) {
         return service.topologyService().getByAddress(endpoint.toString()) != null;
     }
 
+    /** {@inheritDoc} */
     @Override public void registerConnectEventListener(TopologyEventHandler handler) {
         service.topologyService().addEventHandler(handler);
     }
 
+    /** {@inheritDoc} */
     @Override public Object invokeSync(Endpoint endpoint, Object request, InvokeContext ctx,
         long timeoutMs) throws InterruptedException, RemotingException {
         if (!checkConnection(endpoint))
@@ -112,6 +115,7 @@ public class IgniteRpcClient implements RpcClientEx {
         }
     }
 
+    /** {@inheritDoc} */
     @Override public void invokeAsync(Endpoint endpoint, Object request, InvokeContext ctx, InvokeCallback callback,
         long timeoutMs) throws InterruptedException, RemotingException {
         if (!checkConnection(endpoint))
@@ -170,10 +174,12 @@ public class IgniteRpcClient implements RpcClientEx {
         });
     }
 
+    /** {@inheritDoc} */
     @Override public boolean init(RpcOptions opts) {
         return true;
     }
 
+    /** {@inheritDoc} */
     @Override public void shutdown() {
         try {
             if (!reuse)
@@ -184,10 +190,12 @@ public class IgniteRpcClient implements RpcClientEx {
         }
     }
 
+    /** {@inheritDoc} */
     @Override public void blockMessages(BiPredicate<Object, String> predicate) {
         this.blockPred = predicate;
     }
 
+    /** {@inheritDoc} */
     @Override public void stopBlock() {
         ArrayList<Object[]> msgs = new ArrayList<>();
 
@@ -204,18 +212,22 @@ public class IgniteRpcClient implements RpcClientEx {
         }
     }
 
+    /** {@inheritDoc} */
     @Override public void recordMessages(BiPredicate<Object, String> predicate) {
         this.recordPred = predicate;
     }
 
+    /** {@inheritDoc} */
     @Override public void stopRecord() {
         this.recordPred = null;
     }
 
+    /** {@inheritDoc} */
     @Override public Queue<Object[]> recordedMessages() {
         return recordedMsgs;
     }
 
+    /** {@inheritDoc} */
     @Override public Queue<Object[]> blockedMessages() {
         return blockedMsgs;
     }
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
index bad806c..278c03d 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.ignite.raft.jraft.rpc;
 
+import java.net.Inet4Address;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
@@ -26,32 +28,34 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.raft.jraft.error.RemotingException;
-import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.raft.jraft.util.Endpoint;
 import org.apache.ignite.raft.jraft.util.Utils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import static org.apache.ignite.raft.jraft.test.TestUtils.INIT_PORT;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /**
  *
  */
 public abstract class AbstractRpcTest {
     protected Endpoint endpoint;
-    protected List<RpcServer> servers = new ArrayList<>();
 
-    @Before
-    public void setup() {
-        endpoint = new Endpoint(TestUtils.getMyIp(), INIT_PORT);
-        RpcServer server = createServer(endpoint);
+    private final List<RpcServer<?>> servers = new ArrayList<>();
+
+    private final List<RpcClient> clients = new ArrayList<>();
+
+    @BeforeEach
+    public void setup() throws UnknownHostException {
+        endpoint = new Endpoint(Inet4Address.getLocalHost().getHostAddress(), INIT_PORT);
+
+        RpcServer<?> server = createServer(endpoint);
         server.registerProcessor(new Request1RpcProcessor());
         server.registerProcessor(new Request2RpcProcessor());
         server.init(null);
@@ -59,47 +63,50 @@ public abstract class AbstractRpcTest {
         servers.add(server);
     }
 
-    @After
-    public void teardown() {
-        for (RpcServer server : servers) {
-            server.shutdown();
-        }
+    @AfterEach
+    public void tearDown() {
+        clients.forEach(RpcClient::shutdown);
+        servers.forEach(RpcServer::shutdown);
     }
 
     /**
      * @param endpoint The endpoint.
      * @return The server.
      */
-    public abstract RpcServer createServer(Endpoint endpoint);
+    public abstract RpcServer<?> createServer(Endpoint endpoint);
 
     /**
      * @return The client.
      */
-    public abstract RpcClient createClient();
+    private RpcClient createClient() {
+        RpcClient client = createClient0();
+
+        clients.add(client);
+
+        return client;
+    }
+
+    public abstract RpcClient createClient0();
 
     @Test
     public void testConnection() {
         RpcClient client = createClient();
 
         assertTrue(client.checkConnection(endpoint));
-
-        client.shutdown();
     }
 
     @Test
-    public void testSyncProcessing() throws RemotingException, InterruptedException {
+    public void testSyncProcessing() throws Exception {
         RpcClient client = createClient();
         Response1 resp1 = (Response1) client.invokeSync(endpoint, new Request1(), new InvokeContext(), 5000);
         assertNotNull(resp1);
 
         Response2 resp2 = (Response2) client.invokeSync(endpoint, new Request2(), new InvokeContext(), 5000);
         assertNotNull(resp2);
-
-        client.shutdown();
     }
 
     @Test
-    public void testAsyncProcessing() throws RemotingException, InterruptedException {
+    public void testAsyncProcessing() throws Exception {
         RpcClient client = createClient();
 
         CountDownLatch l1 = new CountDownLatch(1);
@@ -119,35 +126,27 @@ public abstract class AbstractRpcTest {
         }, 5000);
         l2.await(5_000, TimeUnit.MILLISECONDS);
         assertNotNull(resp2);
-
-        client.shutdown();
     }
 
     @Test
-    public void testDisconnect() throws Exception {
+    public void testDisconnect() {
         RpcClient client1 = createClient();
         RpcClient client2 = createClient();
 
-        try {
-            assertTrue(client1.checkConnection(endpoint));
-            assertTrue(client2.checkConnection(endpoint));
+        assertTrue(client1.checkConnection(endpoint));
+        assertTrue(client2.checkConnection(endpoint));
 
-            servers.get(0).shutdown();
+        servers.get(0).shutdown();
 
-            assertTrue(waitForTopology(client1, 2, 5_000));
-            assertTrue(waitForTopology(client2, 2, 5_000));
+        assertTrue(waitForTopology(client1, 2, 5_000));
+        assertTrue(waitForTopology(client2, 2, 5_000));
 
-            assertFalse(client1.checkConnection(endpoint));
-            assertFalse(client2.checkConnection(endpoint));
-        }
-        finally {
-            client1.shutdown();
-            client2.shutdown();
-        }
+        assertFalse(client1.checkConnection(endpoint));
+        assertFalse(client2.checkConnection(endpoint));
     }
 
     @Test
-    public void testRecordedSync() throws RemotingException, InterruptedException {
+    public void testRecordedSync() throws Exception {
         RpcClientEx client1 = (RpcClientEx) createClient();
         client1.recordMessages((a, b) -> true);
 
@@ -166,12 +165,10 @@ public abstract class AbstractRpcTest {
         assertTrue(recorded.poll()[0] instanceof Response1);
         assertTrue(recorded.poll()[0] instanceof Request2);
         assertTrue(recorded.poll()[0] instanceof Response2);
-
-        client1.shutdown();
     }
 
     @Test
-    public void testRecordedSyncTimeout() throws RemotingException, InterruptedException {
+    public void testRecordedSyncTimeout() {
         RpcClientEx client1 = (RpcClientEx) createClient();
         client1.recordMessages((a, b) -> true);
 
@@ -184,7 +181,7 @@ public abstract class AbstractRpcTest {
 
             fail();
         }
-        catch (Exception e) {
+        catch (Exception ignored) {
             // Expected.
         }
 
@@ -192,12 +189,10 @@ public abstract class AbstractRpcTest {
 
         assertEquals(1, recorded.size());
         assertTrue(recorded.poll()[0] instanceof Request1);
-
-        client1.shutdown();
     }
 
     @Test
-    public void testRecordedAsync() throws RemotingException, InterruptedException {
+    public void testRecordedAsync() throws Exception {
         RpcClientEx client1 = (RpcClientEx) createClient();
         client1.recordMessages((a, b) -> true);
 
@@ -213,12 +208,10 @@ public abstract class AbstractRpcTest {
         Queue<Object[]> recorded = client1.recordedMessages();
 
         assertEquals(4, recorded.size());
-
-        client1.shutdown();
     }
 
     @Test
-    public void testRecordedAsyncTimeout() throws RemotingException, InterruptedException {
+    public void testRecordedAsyncTimeout() {
         RpcClientEx client1 = (RpcClientEx) createClient();
         client1.recordMessages((a, b) -> true);
 
@@ -227,7 +220,7 @@ public abstract class AbstractRpcTest {
         try {
             Request1 request = new Request1();
             request.val = 10_000;
-            CompletableFuture fut = new CompletableFuture<>();
+            CompletableFuture<Object> fut = new CompletableFuture<>();
 
             client1.invokeAsync(endpoint, request, null, (result, err) -> {
                 if (err == null)
@@ -240,7 +233,7 @@ public abstract class AbstractRpcTest {
 
             fail();
         }
-        catch (Exception e) {
+        catch (Exception ignored) {
             // Expected.
         }
 
@@ -248,8 +241,6 @@ public abstract class AbstractRpcTest {
 
         assertEquals(1, recorded.size());
         assertTrue(recorded.poll()[0] instanceof Request1);
-
-        client1.shutdown();
     }
 
     @Test
@@ -279,8 +270,6 @@ public abstract class AbstractRpcTest {
         client1.stopBlock();
 
         resp.get(5_000, TimeUnit.MILLISECONDS);
-
-        client1.shutdown();
     }
 
     @Test
@@ -290,13 +279,9 @@ public abstract class AbstractRpcTest {
 
         assertTrue(client1.checkConnection(endpoint));
 
-        CompletableFuture resp = new CompletableFuture();
+        CompletableFuture<Object> resp = new CompletableFuture<>();
 
-        client1.invokeAsync(endpoint, new Request1(), new InvokeCallback() {
-            @Override public void complete(Object result, Throwable err) {
-                resp.complete(result);
-            }
-        }, 30_000);
+        client1.invokeAsync(endpoint, new Request1(), (result, err) -> resp.complete(result), 30_000);
 
         Thread.sleep(500);
 
@@ -309,11 +294,11 @@ public abstract class AbstractRpcTest {
         client1.stopBlock();
 
         resp.get(5_000, TimeUnit.MILLISECONDS);
-
-        client1.shutdown();
     }
 
-    protected static class Request1RpcProcessor implements RpcProcessor<Request1> {
+    /** */
+    private static class Request1RpcProcessor implements RpcProcessor<Request1> {
+        /** {@inheritDoc} */
         @Override public void handleRequest(RpcContext rpcCtx, Request1 request) {
             if (request.val == 10_000)
                 try {
@@ -328,36 +313,48 @@ public abstract class AbstractRpcTest {
             rpcCtx.sendResponse(resp1);
         }
 
+        /** {@inheritDoc} */
         @Override public String interest() {
             return Request1.class.getName();
         }
     }
 
-    protected static class Request2RpcProcessor implements RpcProcessor<Request2> {
+    /** */
+    private static class Request2RpcProcessor implements RpcProcessor<Request2> {
+        /** {@inheritDoc} */
         @Override public void handleRequest(RpcContext rpcCtx, Request2 request) {
             Response2 resp2 = new Response2();
             resp2.val = request.val + 1;
             rpcCtx.sendResponse(resp2);
         }
 
+        /** {@inheritDoc} */
         @Override public String interest() {
             return Request2.class.getName();
         }
     }
 
+    /** */
     private static class Request1 implements Message {
+        /** */
         int val;
     }
 
+    /** */
     private static class Request2 implements Message {
+        /** */
         int val;
     }
 
+    /** */
     private static class Response1 implements Message {
+        /** */
         int val;
     }
 
+    /** */
     private static class Response2 implements Message {
+        /** */
         int val;
     }
 
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
index 0ddffc0..52f1ec8 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.raft.jraft.rpc;
 
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessageSerializationRegistryImpl;
@@ -34,23 +33,20 @@ import org.apache.ignite.raft.jraft.util.Endpoint;
  */
 public class IgniteRpcTest extends AbstractRpcTest {
     /**
-     * The logger.
-     */
-    private static final IgniteLogger LOG = IgniteLogger.forClass(IgniteRpcTest.class);
-
-    /**
      * Serialization registry.
      */
     private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
 
     /** The counter. */
-    private AtomicInteger cntr = new AtomicInteger();
+    private final AtomicInteger cntr = new AtomicInteger();
 
-    @Override public RpcServer createServer(Endpoint endpoint) {
+    /** {@inheritDoc} */
+    @Override public RpcServer<?> createServer(Endpoint endpoint) {
         return new TestIgniteRpcServer(endpoint, new NodeManager());
     }
 
-    @Override public RpcClient createClient() {
+    /** {@inheritDoc} */
+    @Override public RpcClient createClient0() {
         int i = cntr.incrementAndGet();
 
         ClusterService service = createService("client" + i, endpoint.getPort() - i, List.of(endpoint.toString()));