You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/05/05 15:41:13 UTC

[ignite-3] branch main updated: IGNITE-14682 Fix sending messages from a node to itself (#119)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8bec45a  IGNITE-14682 Fix sending messages from a node to itself (#119)
8bec45a is described below

commit 8bec45a24104089e71dd742ff8abec49a5195718
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Wed May 5 18:39:01 2021 +0300

    IGNITE-14682 Fix sending messages from a node to itself (#119)
---
 .../org/apache/ignite/network/TestMessage.java     |  5 ++
 .../scalecube/ITScaleCubeNetworkMessagingTest.java | 83 +++++++++++++++++-
 .../scalecube/DelegatingTransportFactory.java      | 99 ++++++++++++++++++++++
 .../scalecube/ScaleCubeClusterServiceFactory.java  |  9 +-
 .../scalecube/ScaleCubeTopologyService.java        | 11 ++-
 5 files changed, 195 insertions(+), 12 deletions(-)

diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
index 3ee3594..25faecc 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
@@ -19,6 +19,7 @@
 package org.apache.ignite.network;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
@@ -43,6 +44,10 @@ public class TestMessage implements NetworkMessage, Serializable {
         this.map = map;
     }
 
+    public TestMessage(String msg) {
+        this(msg, Collections.emptyMap());
+    }
+
     public String msg() {
         return msg;
     }
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index c241dcf..904dc9b 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -16,19 +16,19 @@
  */
 package org.apache.ignite.network.scalecube;
 
-import io.scalecube.cluster.ClusterImpl;
-import io.scalecube.cluster.transport.api.Transport;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import io.scalecube.cluster.ClusterImpl;
+import io.scalecube.cluster.transport.api.Transport;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
@@ -48,6 +48,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** */
 class ITScaleCubeNetworkMessagingTest {
+    /**
+     * Test cluster.
+     * <p>
+     * Each test should create its own cluster with the required number of nodes.
+     */
     private Cluster testCluster;
 
     /** */
@@ -78,7 +83,7 @@ class ITScaleCubeNetworkMessagingTest {
 
         testCluster.startAwait();
 
-        TestMessage testMessage = new TestMessage("Message from Alice", Collections.emptyMap());
+        TestMessage testMessage = new TestMessage("Message from Alice");
 
         ClusterService alice = testCluster.members.get(0);
 
@@ -114,6 +119,76 @@ class ITScaleCubeNetworkMessagingTest {
     }
 
     /**
+     * Sends a message from a node to itself and verifies that it gets delivered successfully.
+     */
+    @Test
+    public void testSendMessageToSelf() throws Exception {
+        testCluster = new Cluster(1);
+        testCluster.startAwait();
+
+        ClusterService member = testCluster.members.get(0);
+
+        ClusterNode self = member.topologyService().localMember();
+
+        class Data {
+            private final NetworkMessage message;
+            private final ClusterNode sender;
+            private final String correlationId;
+
+            private Data(NetworkMessage message, ClusterNode sender, String correlationId) {
+                this.message = message;
+                this.sender = sender;
+                this.correlationId = correlationId;
+            }
+        }
+
+        var dataFuture = new CompletableFuture<Data>();
+
+        member.messagingService().addMessageHandler(
+            (message, sender, correlationId) -> dataFuture.complete(new Data(message, sender, correlationId))
+        );
+
+        var requestMessage = new TestMessage("request");
+        var correlationId = "foobar";
+
+        member.messagingService().send(self, requestMessage, correlationId);
+
+        Data actualData = dataFuture.get(3, TimeUnit.SECONDS);
+
+        assertThat(actualData.message, is(requestMessage));
+        assertThat(actualData.sender, is(self));
+        assertThat(actualData.correlationId, is(correlationId));
+    }
+
+    /**
+     * Sends a messages from a node to itself and awaits the response.
+     */
+    @Test
+    public void testInvokeMessageToSelf() throws Exception {
+        testCluster = new Cluster(1);
+        testCluster.startAwait();
+
+        ClusterService member = testCluster.members.get(0);
+
+        ClusterNode self = member.topologyService().localMember();
+
+        var requestMessage = new TestMessage("request");
+        var responseMessage = new TestMessage("response");
+
+        member.messagingService().addMessageHandler((message, sender, correlationId) -> {
+            if (message.equals(requestMessage))
+                member.messagingService().send(self, responseMessage, correlationId);
+        });
+
+        TestMessage actualResponseMessage = member.messagingService()
+            .invoke(self, requestMessage, 1000)
+            .thenApply(TestMessage.class::cast)
+            .get(3, TimeUnit.SECONDS);
+
+        assertThat(actualResponseMessage, is(responseMessage));
+    }
+
+    /**
      * Test shutdown.
      * @param forceful Whether shutdown should be forceful.
      * @throws Exception If failed.
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/DelegatingTransportFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/DelegatingTransportFactory.java
new file mode 100644
index 0000000..7e61653
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/DelegatingTransportFactory.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import java.util.Objects;
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.cluster.transport.api.TransportConfig;
+import io.scalecube.cluster.transport.api.TransportFactory;
+import io.scalecube.net.Address;
+import io.scalecube.transport.netty.tcp.TcpTransportFactory;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * This class is a hack designed to handle a single use-case: sending a message from a node to itself. In all other
+ * cases it should behave as the default ScaleCube transport factory.
+ */
+class DelegatingTransportFactory implements TransportFactory {
+    /** */
+    private final ScaleCubeMessagingService messagingService;
+
+    /**
+     * @param messagingService Messaging service.
+     */
+    DelegatingTransportFactory(ScaleCubeMessagingService messagingService) {
+        this.messagingService = messagingService;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Transport createTransport(TransportConfig config) {
+        var delegateFactory = TransportFactory.INSTANCE == null ? new TcpTransportFactory() : TransportFactory.INSTANCE;
+
+        Transport delegate = delegateFactory.createTransport(config);
+
+        return new Transport() {
+            @Override public Address address() {
+                return delegate.address();
+            }
+
+            @Override public Mono<Transport> start() {
+                return delegate.start().thenReturn(this);
+            }
+
+            @Override public Mono<Void> stop() {
+                return delegate.stop();
+            }
+
+            @Override public boolean isStopped() {
+                return delegate.isStopped();
+            }
+
+            @Override public Mono<Void> send(Address address, Message message) {
+                return delegate.send(address, message);
+            }
+
+            @Override public Flux<Message> listen() {
+                return delegate.listen();
+            }
+
+            @Override public Mono<Message> requestResponse(Address address, Message request) {
+                return address.equals(address()) ?
+                    requestResponseToSelf(request) :
+                    delegate.requestResponse(address, request);
+            }
+
+            private Mono<Message> requestResponseToSelf(Message request) {
+                Objects.requireNonNull(request, "request must be not null");
+                Objects.requireNonNull(request.correlationId(), "correlationId must be not null");
+
+                Mono<Message> result = listen()
+                    .filter(resp -> resp.correlationId() != null)
+                    .filter(resp -> resp.correlationId().equals(request.correlationId()))
+                    .next();
+
+                // manually fire the message event instead of sending it, because otherwise it will be received
+                // immediately, replacing the response that might have been sent by the event handlers.
+                messagingService.fireEvent(request);
+
+                return result;
+            }
+        };
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 74af277..e664a40 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -17,13 +17,13 @@
 
 package org.apache.ignite.network.scalecube;
 
+import java.util.List;
+import java.util.stream.Collectors;
 import io.scalecube.cluster.ClusterImpl;
 import io.scalecube.cluster.ClusterMessageHandler;
 import io.scalecube.cluster.membership.MembershipEvent;
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.net.Address;
-import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.ignite.network.AbstractClusterService;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterService;
@@ -39,6 +39,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
     @Override public ClusterService createClusterService(ClusterLocalConfiguration context) {
         var topologyService = new ScaleCubeTopologyService();
         var messagingService = new ScaleCubeMessagingService(topologyService);
+        var transportFactory = new DelegatingTransportFactory(messagingService);
 
         var cluster = new ClusterImpl()
             .handler(cl -> new ClusterMessageHandler() {
@@ -53,7 +54,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
                 }
             })
             .config(opts -> opts.memberAlias(context.getName()))
-            .transport(opts -> opts.port(context.getPort()))
+            .transport(opts -> opts.port(context.getPort()).transportFactory(transportFactory))
             .membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())).suspicionMult(1));
 
         // resolve cyclic dependencies
@@ -80,7 +81,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
      * @param addresses "host:port" formatted strings.
      * @return List of addresses.
      */
-    private List<Address> parseAddresses(List<String> addresses) {
+    private static List<Address> parseAddresses(List<String> addresses) {
         try {
             return addresses.stream()
                 .map(Address::from)
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
index 31eaaf7..70bbbde 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -16,12 +16,12 @@
  */
 package org.apache.ignite.network.scalecube;
 
-import io.scalecube.cluster.Member;
-import io.scalecube.cluster.membership.MembershipEvent;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import io.scalecube.cluster.Member;
+import io.scalecube.cluster.membership.MembershipEvent;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.AbstractTopologyService;
 import org.apache.ignite.network.ClusterNode;
@@ -42,9 +42,10 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
      * Sets the ScaleCube's local {@link Member}.
      */
     void setLocalMember(Member member) {
-        this.localMember = fromMember(member);
+        localMember = fromMember(member);
 
-        this.members.put(localMember.name(), localMember);
+        // emit an artificial event as if the local member has joined the topology (ScaleCube doesn't do that)
+        onMembershipEvent(MembershipEvent.createAdded(member, null, System.currentTimeMillis()));
     }
 
     /**
@@ -109,6 +110,8 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
 
     /** {@inheritDoc} */
     @Override public ClusterNode localMember() {
+        assert localMember != null : "Cluster has not been started";
+
         return localMember;
     }