You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/08/12 06:25:35 UTC

[ignite-3] branch main updated: IGNITE-15196 Fail invoke futures on network stop (#269)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0507f4a  IGNITE-15196 Fail invoke futures on network stop (#269)
0507f4a is described below

commit 0507f4a9dbf6cd6fd143d30d19ea056f04c70372
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Thu Aug 12 09:25:28 2021 +0300

    IGNITE-15196 Fail invoke futures on network stop (#269)
---
 .../scalecube/ITScaleCubeNetworkMessagingTest.java | 42 ++++++++++++++++++++++
 .../ScaleCubeDirectMarshallerTransport.java        |  5 +--
 .../scalecube/ScaleCubeMessagingService.java       | 33 ++++-------------
 3 files changed, 52 insertions(+), 28 deletions(-)

diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index b2037ec..1b5bc5a 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -25,12 +25,14 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import io.scalecube.cluster.ClusterImpl;
 import io.scalecube.cluster.transport.api.Transport;
 import org.apache.ignite.internal.network.NetworkMessageTypes;
+import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
@@ -53,8 +55,10 @@ import reactor.core.publisher.Mono;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -220,6 +224,44 @@ class ITScaleCubeNetworkMessagingTest {
     }
 
     /**
+     * Tests that if the network component is stopped while waiting for a response to an "invoke" call,
+     * the corresponding future completes exceptionally.
+     */
+    @Test
+    public void testInvokeDuringStop() throws InterruptedException {
+        testCluster = new Cluster(2);
+        testCluster.startAwait();
+
+        ClusterService member0 = testCluster.members.get(0);
+        ClusterService member1 = testCluster.members.get(1);
+
+        // we don't register a message listener on the receiving side, so all "invoke"s should timeout
+
+        // perform two invokes to test that multiple requests can get cancelled
+        CompletableFuture<NetworkMessage> invoke0 = member0.messagingService().invoke(
+            member1.topologyService().localMember(),
+            messageFactory.testMessage().build(),
+            1000
+        );
+
+        CompletableFuture<NetworkMessage> invoke1 = member0.messagingService().invoke(
+            member1.topologyService().localMember(),
+            messageFactory.testMessage().build(),
+            1000
+        );
+
+        member0.stop();
+
+        ExecutionException e = assertThrows(ExecutionException.class, () -> invoke0.get(1, TimeUnit.SECONDS));
+
+        assertThat(e.getCause(), instanceOf(NodeStoppingException.class));
+
+        e = assertThrows(ExecutionException.class, () -> invoke1.get(1, TimeUnit.SECONDS));
+
+        assertThat(e.getCause(), instanceOf(NodeStoppingException.class));
+    }
+
+    /**
      * Serializable message that belongs to the {@link NetworkMessageTypes} message group.
      */
     private static class MockNetworkMessage implements NetworkMessage, Serializable {
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index 1257f8b..f641f2b 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.network.message.ScaleCubeMessage;
 import org.apache.ignite.internal.network.netty.ConnectionManager;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NetworkMessage;
@@ -132,8 +133,8 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
         return Mono.defer(() -> {
             LOG.info("Stopping {}", address);
 
-            // Complete incoming messages observable
-            sink.complete();
+            // Fail all incoming message listeners on stop
+            sink.error(new NodeStoppingException());
 
             LOG.info("Stopped {}", address);
             return Mono.empty();
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index 1ee6be2..e325004 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -20,7 +20,6 @@ package org.apache.ignite.network.scalecube;
 import java.time.Duration;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import io.scalecube.cluster.Cluster;
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.net.Address;
@@ -77,12 +76,8 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
     @Override public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) {
         // TODO: IGNITE-15161 Temporarly, probably should be removed after the implementation
         // TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
-        if (cluster.isShutdown()) {
-            CompletableFuture nodeStoppingRes = new CompletableFuture<NetworkMessage>();
-            nodeStoppingRes.completeExceptionally(new NodeStoppingException());
-
-            return nodeStoppingRes;
-        }
+        if (cluster.isShutdown())
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         return cluster
             .send(fromNetworkAddress(recipient.address()), Message.fromData(msg))
@@ -98,12 +93,8 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
     @Override public CompletableFuture<Void> send(NetworkAddress addr, NetworkMessage msg, String correlationId) {
         // TODO: IGNITE-15161 Temporarly, probably should be removed after the implementation
         // TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
-        if (cluster.isShutdown()) {
-            CompletableFuture nodeStoppingRes = new CompletableFuture<NetworkMessage>();
-            nodeStoppingRes.completeExceptionally(new NodeStoppingException());
-
-            return nodeStoppingRes;
-        }
+        if (cluster.isShutdown())
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         var message = Message
             .withData(msg)
@@ -124,29 +115,19 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
     @Override public CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, NetworkMessage msg, long timeout) {
         // TODO: IGNITE-15161 Temporarly, probably should be removed after the implementation
         // TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
-        if (cluster.isShutdown()) {
-            CompletableFuture nodeStoppingRes = new CompletableFuture<NetworkMessage>();
-            nodeStoppingRes.completeExceptionally(new NodeStoppingException());
-
-            return nodeStoppingRes;
-        }
+        if (cluster.isShutdown())
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         var message = Message
             .withData(msg)
             .correlationId(UUID.randomUUID().toString())
             .build();
 
-        // TODO: IGNITE-15196 Null seems to be an unexpected result on node stopping.
         return cluster
             .requestResponse(fromNetworkAddress(addr), message)
             .timeout(Duration.ofMillis(timeout))
             .toFuture()
-            .thenApply(m -> {
-                if (m == null)
-                    throw new CompletionException(new NodeStoppingException());
-                else
-                    return m.data();
-            }); // The result can be null on node stopping.
+            .thenApply(Message::data);
     }
 
     /**