You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/05/05 15:41:13 UTC
[ignite-3] branch main updated: IGNITE-14682 Fix sending messages
from a node to itself (#119)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8bec45a IGNITE-14682 Fix sending messages from a node to itself (#119)
8bec45a is described below
commit 8bec45a24104089e71dd742ff8abec49a5195718
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Wed May 5 18:39:01 2021 +0300
IGNITE-14682 Fix sending messages from a node to itself (#119)
---
.../org/apache/ignite/network/TestMessage.java | 5 ++
.../scalecube/ITScaleCubeNetworkMessagingTest.java | 83 +++++++++++++++++-
.../scalecube/DelegatingTransportFactory.java | 99 ++++++++++++++++++++++
.../scalecube/ScaleCubeClusterServiceFactory.java | 9 +-
.../scalecube/ScaleCubeTopologyService.java | 11 ++-
5 files changed, 195 insertions(+), 12 deletions(-)
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
index 3ee3594..25faecc 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
@@ -19,6 +19,7 @@
package org.apache.ignite.network;
import java.io.Serializable;
+import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
@@ -43,6 +44,10 @@ public class TestMessage implements NetworkMessage, Serializable {
this.map = map;
}
+ public TestMessage(String msg) {
+ this(msg, Collections.emptyMap());
+ }
+
public String msg() {
return msg;
}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index c241dcf..904dc9b 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -16,19 +16,19 @@
*/
package org.apache.ignite.network.scalecube;
-import io.scalecube.cluster.ClusterImpl;
-import io.scalecube.cluster.transport.api.Transport;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import io.scalecube.cluster.ClusterImpl;
+import io.scalecube.cluster.transport.api.Transport;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
@@ -48,6 +48,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** */
class ITScaleCubeNetworkMessagingTest {
+ /**
+ * Test cluster.
+ * <p>
+ * Each test should create its own cluster with the required number of nodes.
+ */
private Cluster testCluster;
/** */
@@ -78,7 +83,7 @@ class ITScaleCubeNetworkMessagingTest {
testCluster.startAwait();
- TestMessage testMessage = new TestMessage("Message from Alice", Collections.emptyMap());
+ TestMessage testMessage = new TestMessage("Message from Alice");
ClusterService alice = testCluster.members.get(0);
@@ -114,6 +119,76 @@ class ITScaleCubeNetworkMessagingTest {
}
/**
+ * Sends a message from a node to itself and verifies that it gets delivered successfully.
+ */
+ @Test
+ public void testSendMessageToSelf() throws Exception {
+ testCluster = new Cluster(1);
+ testCluster.startAwait();
+
+ ClusterService member = testCluster.members.get(0);
+
+ ClusterNode self = member.topologyService().localMember();
+
+ class Data {
+ private final NetworkMessage message;
+ private final ClusterNode sender;
+ private final String correlationId;
+
+ private Data(NetworkMessage message, ClusterNode sender, String correlationId) {
+ this.message = message;
+ this.sender = sender;
+ this.correlationId = correlationId;
+ }
+ }
+
+ var dataFuture = new CompletableFuture<Data>();
+
+ member.messagingService().addMessageHandler(
+ (message, sender, correlationId) -> dataFuture.complete(new Data(message, sender, correlationId))
+ );
+
+ var requestMessage = new TestMessage("request");
+ var correlationId = "foobar";
+
+ member.messagingService().send(self, requestMessage, correlationId);
+
+ Data actualData = dataFuture.get(3, TimeUnit.SECONDS);
+
+ assertThat(actualData.message, is(requestMessage));
+ assertThat(actualData.sender, is(self));
+ assertThat(actualData.correlationId, is(correlationId));
+ }
+
+ /**
+ * Sends a messages from a node to itself and awaits the response.
+ */
+ @Test
+ public void testInvokeMessageToSelf() throws Exception {
+ testCluster = new Cluster(1);
+ testCluster.startAwait();
+
+ ClusterService member = testCluster.members.get(0);
+
+ ClusterNode self = member.topologyService().localMember();
+
+ var requestMessage = new TestMessage("request");
+ var responseMessage = new TestMessage("response");
+
+ member.messagingService().addMessageHandler((message, sender, correlationId) -> {
+ if (message.equals(requestMessage))
+ member.messagingService().send(self, responseMessage, correlationId);
+ });
+
+ TestMessage actualResponseMessage = member.messagingService()
+ .invoke(self, requestMessage, 1000)
+ .thenApply(TestMessage.class::cast)
+ .get(3, TimeUnit.SECONDS);
+
+ assertThat(actualResponseMessage, is(responseMessage));
+ }
+
+ /**
* Test shutdown.
* @param forceful Whether shutdown should be forceful.
* @throws Exception If failed.
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/DelegatingTransportFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/DelegatingTransportFactory.java
new file mode 100644
index 0000000..7e61653
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/DelegatingTransportFactory.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import java.util.Objects;
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.cluster.transport.api.TransportConfig;
+import io.scalecube.cluster.transport.api.TransportFactory;
+import io.scalecube.net.Address;
+import io.scalecube.transport.netty.tcp.TcpTransportFactory;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * This class is a hack designed to handle a single use-case: sending a message from a node to itself. In all other
+ * cases it should behave as the default ScaleCube transport factory.
+ */
+class DelegatingTransportFactory implements TransportFactory {
+ /** */
+ private final ScaleCubeMessagingService messagingService;
+
+ /**
+ * @param messagingService Messaging service.
+ */
+ DelegatingTransportFactory(ScaleCubeMessagingService messagingService) {
+ this.messagingService = messagingService;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Transport createTransport(TransportConfig config) {
+ var delegateFactory = TransportFactory.INSTANCE == null ? new TcpTransportFactory() : TransportFactory.INSTANCE;
+
+ Transport delegate = delegateFactory.createTransport(config);
+
+ return new Transport() {
+ @Override public Address address() {
+ return delegate.address();
+ }
+
+ @Override public Mono<Transport> start() {
+ return delegate.start().thenReturn(this);
+ }
+
+ @Override public Mono<Void> stop() {
+ return delegate.stop();
+ }
+
+ @Override public boolean isStopped() {
+ return delegate.isStopped();
+ }
+
+ @Override public Mono<Void> send(Address address, Message message) {
+ return delegate.send(address, message);
+ }
+
+ @Override public Flux<Message> listen() {
+ return delegate.listen();
+ }
+
+ @Override public Mono<Message> requestResponse(Address address, Message request) {
+ return address.equals(address()) ?
+ requestResponseToSelf(request) :
+ delegate.requestResponse(address, request);
+ }
+
+ private Mono<Message> requestResponseToSelf(Message request) {
+ Objects.requireNonNull(request, "request must be not null");
+ Objects.requireNonNull(request.correlationId(), "correlationId must be not null");
+
+ Mono<Message> result = listen()
+ .filter(resp -> resp.correlationId() != null)
+ .filter(resp -> resp.correlationId().equals(request.correlationId()))
+ .next();
+
+ // manually fire the message event instead of sending it, because otherwise it will be received
+ // immediately, replacing the response that might have been sent by the event handlers.
+ messagingService.fireEvent(request);
+
+ return result;
+ }
+ };
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 74af277..e664a40 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -17,13 +17,13 @@
package org.apache.ignite.network.scalecube;
+import java.util.List;
+import java.util.stream.Collectors;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
-import java.util.List;
-import java.util.stream.Collectors;
import org.apache.ignite.network.AbstractClusterService;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
@@ -39,6 +39,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
@Override public ClusterService createClusterService(ClusterLocalConfiguration context) {
var topologyService = new ScaleCubeTopologyService();
var messagingService = new ScaleCubeMessagingService(topologyService);
+ var transportFactory = new DelegatingTransportFactory(messagingService);
var cluster = new ClusterImpl()
.handler(cl -> new ClusterMessageHandler() {
@@ -53,7 +54,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
}
})
.config(opts -> opts.memberAlias(context.getName()))
- .transport(opts -> opts.port(context.getPort()))
+ .transport(opts -> opts.port(context.getPort()).transportFactory(transportFactory))
.membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())).suspicionMult(1));
// resolve cyclic dependencies
@@ -80,7 +81,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
* @param addresses "host:port" formatted strings.
* @return List of addresses.
*/
- private List<Address> parseAddresses(List<String> addresses) {
+ private static List<Address> parseAddresses(List<String> addresses) {
try {
return addresses.stream()
.map(Address::from)
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
index 31eaaf7..70bbbde 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -16,12 +16,12 @@
*/
package org.apache.ignite.network.scalecube;
-import io.scalecube.cluster.Member;
-import io.scalecube.cluster.membership.MembershipEvent;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import io.scalecube.cluster.Member;
+import io.scalecube.cluster.membership.MembershipEvent;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.AbstractTopologyService;
import org.apache.ignite.network.ClusterNode;
@@ -42,9 +42,10 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
* Sets the ScaleCube's local {@link Member}.
*/
void setLocalMember(Member member) {
- this.localMember = fromMember(member);
+ localMember = fromMember(member);
- this.members.put(localMember.name(), localMember);
+ // emit an artificial event as if the local member has joined the topology (ScaleCube doesn't do that)
+ onMembershipEvent(MembershipEvent.createAdded(member, null, System.currentTimeMillis()));
}
/**
@@ -109,6 +110,8 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
/** {@inheritDoc} */
@Override public ClusterNode localMember() {
+ assert localMember != null : "Cluster has not been started";
+
return localMember;
}