You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/08/12 06:25:35 UTC
[ignite-3] branch main updated: IGNITE-15196 Fail invoke futures on
network stop (#269)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0507f4a IGNITE-15196 Fail invoke futures on network stop (#269)
0507f4a is described below
commit 0507f4a9dbf6cd6fd143d30d19ea056f04c70372
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Thu Aug 12 09:25:28 2021 +0300
IGNITE-15196 Fail invoke futures on network stop (#269)
---
.../scalecube/ITScaleCubeNetworkMessagingTest.java | 42 ++++++++++++++++++++++
.../ScaleCubeDirectMarshallerTransport.java | 5 +--
.../scalecube/ScaleCubeMessagingService.java | 33 ++++-------------
3 files changed, 52 insertions(+), 28 deletions(-)
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index b2037ec..1b5bc5a 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -25,12 +25,14 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.transport.api.Transport;
import org.apache.ignite.internal.network.NetworkMessageTypes;
+import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
@@ -53,8 +55,10 @@ import reactor.core.publisher.Mono;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -220,6 +224,44 @@ class ITScaleCubeNetworkMessagingTest {
}
/**
+ * Tests that if the network component is stopped while waiting for a response to an "invoke" call,
+ * the corresponding future completes exceptionally.
+ */
+ @Test
+ public void testInvokeDuringStop() throws InterruptedException {
+ testCluster = new Cluster(2);
+ testCluster.startAwait();
+
+ ClusterService member0 = testCluster.members.get(0);
+ ClusterService member1 = testCluster.members.get(1);
+
+ // we don't register a message listener on the receiving side, so all "invoke"s should timeout
+
+ // perform two invokes to test that multiple requests can get cancelled
+ CompletableFuture<NetworkMessage> invoke0 = member0.messagingService().invoke(
+ member1.topologyService().localMember(),
+ messageFactory.testMessage().build(),
+ 1000
+ );
+
+ CompletableFuture<NetworkMessage> invoke1 = member0.messagingService().invoke(
+ member1.topologyService().localMember(),
+ messageFactory.testMessage().build(),
+ 1000
+ );
+
+ member0.stop();
+
+ ExecutionException e = assertThrows(ExecutionException.class, () -> invoke0.get(1, TimeUnit.SECONDS));
+
+ assertThat(e.getCause(), instanceOf(NodeStoppingException.class));
+
+ e = assertThrows(ExecutionException.class, () -> invoke1.get(1, TimeUnit.SECONDS));
+
+ assertThat(e.getCause(), instanceOf(NodeStoppingException.class));
+ }
+
+ /**
* Serializable message that belongs to the {@link NetworkMessageTypes} message group.
*/
private static class MockNetworkMessage implements NetworkMessage, Serializable {
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index 1257f8b..f641f2b 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.network.message.ScaleCubeMessage;
import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
@@ -132,8 +133,8 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
return Mono.defer(() -> {
LOG.info("Stopping {}", address);
- // Complete incoming messages observable
- sink.complete();
+ // Fail all incoming message listeners on stop
+ sink.error(new NodeStoppingException());
LOG.info("Stopped {}", address);
return Mono.empty();
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index 1ee6be2..e325004 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -20,7 +20,6 @@ package org.apache.ignite.network.scalecube;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
@@ -77,12 +76,8 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
@Override public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) {
// TODO: IGNITE-15161 Temporarly, probably should be removed after the implementation
// TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
- if (cluster.isShutdown()) {
- CompletableFuture nodeStoppingRes = new CompletableFuture<NetworkMessage>();
- nodeStoppingRes.completeExceptionally(new NodeStoppingException());
-
- return nodeStoppingRes;
- }
+ if (cluster.isShutdown())
+ return CompletableFuture.failedFuture(new NodeStoppingException());
return cluster
.send(fromNetworkAddress(recipient.address()), Message.fromData(msg))
@@ -98,12 +93,8 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
@Override public CompletableFuture<Void> send(NetworkAddress addr, NetworkMessage msg, String correlationId) {
// TODO: IGNITE-15161 Temporarly, probably should be removed after the implementation
// TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
- if (cluster.isShutdown()) {
- CompletableFuture nodeStoppingRes = new CompletableFuture<NetworkMessage>();
- nodeStoppingRes.completeExceptionally(new NodeStoppingException());
-
- return nodeStoppingRes;
- }
+ if (cluster.isShutdown())
+ return CompletableFuture.failedFuture(new NodeStoppingException());
var message = Message
.withData(msg)
@@ -124,29 +115,19 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
@Override public CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, NetworkMessage msg, long timeout) {
// TODO: IGNITE-15161 Temporarly, probably should be removed after the implementation
// TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
- if (cluster.isShutdown()) {
- CompletableFuture nodeStoppingRes = new CompletableFuture<NetworkMessage>();
- nodeStoppingRes.completeExceptionally(new NodeStoppingException());
-
- return nodeStoppingRes;
- }
+ if (cluster.isShutdown())
+ return CompletableFuture.failedFuture(new NodeStoppingException());
var message = Message
.withData(msg)
.correlationId(UUID.randomUUID().toString())
.build();
- // TODO: IGNITE-15196 Null seems to be an unexpected result on node stopping.
return cluster
.requestResponse(fromNetworkAddress(addr), message)
.timeout(Duration.ofMillis(timeout))
.toFuture()
- .thenApply(m -> {
- if (m == null)
- throw new CompletionException(new NodeStoppingException());
- else
- return m.data();
- }); // The result can be null on node stopping.
+ .thenApply(Message::data);
}
/**