You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/02/25 15:20:55 UTC
[ignite-3] branch main updated: IGNITE-14110 Networking module
basic API and implementation - #53
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 552d887 IGNITE-14110 Networking module basic API and implementation - #53
552d887 is described below
commit 552d887bee5faad3fb26644fd3c75b3bb57b00b2
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Thu Feb 25 18:20:24 2021 +0300
IGNITE-14110 Networking module basic API and implementation - #53
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
modules/network/pom.xml | 99 +++++++++++++++++++
.../ITScaleCubeNetworkClusterMessagingTest.java | 97 ++++++++++++++++++
.../ignite/network/scalecube/TestMessage.java | 55 +++++++++++
.../scalecube/TestNetworkHandlersProvider.java | 63 ++++++++++++
.../ignite/network/MessageHandlerHolder.java | 59 +++++++++++
.../org/apache/ignite/network/NetworkCluster.java | 70 +++++++++++++
.../ignite/network/NetworkClusterEventHandler.java | 37 +++++++
.../ignite/network/NetworkClusterFactory.java | 75 ++++++++++++++
.../ignite/network/NetworkHandlersProvider.java | 36 +++++++
.../org/apache/ignite/network/NetworkMember.java | 63 ++++++++++++
.../org/apache/ignite/network/NetworkMessage.java | 59 +++++++++++
.../ignite/network/NetworkMessageHandler.java | 27 +++++
.../network/scalecube/ScaleCubeMemberResolver.java | 64 ++++++++++++
.../network/scalecube/ScaleCubeMessageHandler.java | 100 +++++++++++++++++++
.../network/scalecube/ScaleCubeNetworkCluster.java | 110 +++++++++++++++++++++
parent/pom.xml | 7 ++
pom.xml | 1 +
17 files changed, 1022 insertions(+)
diff --git a/modules/network/pom.xml b/modules/network/pom.xml
new file mode 100644
index 0000000..bd8407e
--- /dev/null
+++ b/modules/network/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>ignite-network</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+
+ <dependencies>
+ <!-- Internal module dependencies. -->
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- 3-rd party dependencies. -->
+ <dependency>
+ <groupId>io.scalecube</groupId>
+ <artifactId>scalecube-cluster</artifactId>
+ </dependency>
+
+ <!-- Test dependencies. -->
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration-annotation-processor</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.1</version>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java
new file mode 100644
index 0000000..1f571d0
--- /dev/null
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network.scalecube;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.ignite.network.NetworkCluster;
+import org.apache.ignite.network.NetworkMember;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.MessageHandlerHolder;
+import org.apache.ignite.network.NetworkClusterFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/** */
+class ITScaleCubeNetworkClusterMessagingTest {
+ /** */
+ private final Queue<NetworkCluster> startedMembers = new ConcurrentLinkedQueue<>();
+
+ /** */
+ @AfterEach
+ public void afterEach() throws Exception {
+ Iterator<NetworkCluster> iterator = startedMembers.iterator();
+
+ while (iterator.hasNext()) {
+ iterator.next().shutdown();
+
+ iterator.remove();
+ }
+ }
+
+ /** */
+ @Test
+ public void messageWasSentToAllMembersSuccessfully() {
+ //Given: Three started member which are gathered to cluster.
+ List<String> addresses = List.of("localhost:3344", "localhost:3345", "localhost:3346");
+
+ NetworkCluster alice = startMember("Alice", 3344, addresses);
+ NetworkCluster bob = startMember("Bob", 3345, addresses);
+ NetworkCluster carol = startMember("Carol", 3346, addresses);
+
+ TestMessage sentMessage = new TestMessage("Message from Alice");
+
+ //When: Send one message to all members in cluster.
+ for (NetworkMember member : alice.allMembers()) {
+ System.out.println("SEND : " + member);
+
+ alice.weakSend(member, sentMessage);
+ }
+
+ //Then: All members successfully received message.
+ assertThat(getLastMessage(alice).data(), is(sentMessage));
+ assertThat(getLastMessage(bob).data(), is(sentMessage));
+ assertThat(getLastMessage(carol).data(), is(sentMessage));
+ }
+
+ /** */
+ private NetworkMessage getLastMessage(NetworkCluster alice) {
+ return TestNetworkHandlersProvider.MESSAGE_STORAGE.get(alice.localMember().name());
+ }
+
+ /**
+ * @return Started member.
+ */
+ private NetworkCluster startMember(String name, int port, List<String> addresses) {
+ NetworkCluster member = new NetworkClusterFactory(name, port, addresses)
+ .startScaleCubeBasedCluster(new ScaleCubeMemberResolver(), new MessageHandlerHolder());
+
+ member.addHandlersProvider(new TestNetworkHandlersProvider(name));
+
+ System.out.println("-----" + name + " started");
+
+ startedMembers.add(member);
+
+ return member;
+ }
+
+}
\ No newline at end of file
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessage.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessage.java
new file mode 100644
index 0000000..8420fea
--- /dev/null
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.ignite.network.scalecube;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** */
+class TestMessage implements Serializable {
+ /** */
+ private final String msg;
+
+ /** */
+ public TestMessage(String msg) {
+ this.msg = msg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ TestMessage message = (TestMessage)o;
+ return Objects.equals(msg, message.msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "TestMessage{" +
+ "msg='" + msg + '\'' +
+ '}';
+ }
+}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java
new file mode 100644
index 0000000..6f5718a
--- /dev/null
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.ignite.network.scalecube;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.network.NetworkClusterEventHandler;
+import org.apache.ignite.network.NetworkHandlersProvider;
+import org.apache.ignite.network.NetworkMember;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+
+/** */
+class TestNetworkHandlersProvider implements NetworkHandlersProvider {
+ /** */
+ public static Map<String, NetworkMessage> MESSAGE_STORAGE = new ConcurrentHashMap<>();
+
+ /** */
+ private final String localName;
+
+ /** */
+ public TestNetworkHandlersProvider(String name) {
+ localName = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public NetworkMessageHandler messageHandler() {
+ return event -> {
+ MESSAGE_STORAGE.put(localName, event);
+
+ System.out.println(localName + " handled messages : " + event);
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public NetworkClusterEventHandler clusterEventHandler() {
+ return new NetworkClusterEventHandler() {
+ @Override public void onAppeared(NetworkMember member) {
+ System.out.println(localName + " found member : " + member);
+ }
+
+ @Override public void onDisappeared(NetworkMember member) {
+ System.out.println(localName + " lost member : " + member);
+ }
+ };
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/MessageHandlerHolder.java b/modules/network/src/main/java/org/apache/ignite/network/MessageHandlerHolder.java
new file mode 100644
index 0000000..dbe0e27
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/MessageHandlerHolder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network;
+
+import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * Encapsulation of all cluster handlers for centralized management.
+ */
+public class MessageHandlerHolder {
+ /** Handler for processing incoming messages. */
+ private final Collection<NetworkMessageHandler> messageHandlers = new CopyOnWriteArrayList<>();
+
+ /** Handler for processing all cluster events. */
+ private final Collection<NetworkClusterEventHandler> clusterEventHandlers = new CopyOnWriteArrayList<>();
+
+ /**
+ * @param handler Handler for processing incoming messages.
+ */
+ public void addmessageHandlers(NetworkMessageHandler handler) {
+ messageHandlers.add(handler);
+ }
+
+ /**
+ * @param handler Handler for processing all cluster events.
+ */
+ public void addClusterEventHandlers(NetworkClusterEventHandler handler) {
+ clusterEventHandlers.add(handler);
+ }
+
+ /**
+ * @return All handlers for processing incoming messages.
+ */
+ public Collection<NetworkMessageHandler> messageHandlers() {
+ return messageHandlers;
+ }
+
+ /**
+ * @return All handler for processing all cluster events.
+ */
+ public Collection<NetworkClusterEventHandler> clusterEventHandlers() {
+ return clusterEventHandlers;
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
new file mode 100644
index 0000000..5bdd576
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network;
+
+import java.util.Collection;
+import java.util.concurrent.Future;
+
+/**
+ * Main interface for interaction with network. It allows to get information about network members and send messages to
+ * them.
+ */
+public interface NetworkCluster {
+ /**
+ * Stop the processing of network connection immediately. Sending and receiving messages or obtaining network
+ * members information after this method successfully finished will be impossible.
+ *
+ * @throws Exception If something went wrong.
+ */
+ void shutdown() throws Exception;
+
+ /**
+ * @return Information about local network member.
+ */
+ NetworkMember localMember();
+
+ /**
+ * @return Information about all members which have seen by the local member(including local member itself).
+ */
+ Collection<NetworkMember> allMembers();
+
+ /**
+ * Try to send the message asynchronously to the specific member without any guarantees that this message would be
+ * delivered.
+ *
+ * @param member Netwrok member which should receive the message.
+ * @param msg Message which should be delivered.
+ */
+ void weakSend(NetworkMember member, Object msg);
+
+ /**
+ * Try to send the message asynchronously to the specific member with next guarantees:
+ * * Messages which was sent from one thread to one member will be delivered in the same order as they were sent.
+ * * If message N was successfully delivered to the member that means all messages preceding N also were successfully delivered.
+ *
+ * @param member Network member which should receive the message.
+ * @param msg Message which should be delivered.
+ */
+ Future<?> guaranteedSend(NetworkMember member, Object msg);
+
+ /**
+ * Add provider which allows to get configured handlers for different cluster events(ex. received message).
+ *
+ * @param networkHandlersProvider Provider for obtaining cluster event handlers.
+ */
+ void addHandlersProvider(NetworkHandlersProvider networkHandlersProvider);
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterEventHandler.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterEventHandler.java
new file mode 100644
index 0000000..3a11e7e
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterEventHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network;
+
+/**
+ * Interface for handling events related to cluster changes.
+ */
+public interface NetworkClusterEventHandler {
+ /**
+ * Event which happened when one new member was detected in cluster.
+ *
+ * @param member New network member.
+ */
+ void onAppeared(NetworkMember member);
+
+ /**
+ * Event which happened when one member leave the cluster. It means the member leaves the cluster permanently. If
+ * the connection lost but it is possible to reestablish it, nothing happens here.
+ *
+ * @param member The network member which leaves the cluster.
+ */
+ void onDisappeared(NetworkMember member);
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterFactory.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterFactory.java
new file mode 100644
index 0000000..6ed50b5
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import io.scalecube.cluster.Cluster;
+import io.scalecube.cluster.ClusterImpl;
+import io.scalecube.net.Address;
+import org.apache.ignite.network.scalecube.ScaleCubeMemberResolver;
+import org.apache.ignite.network.scalecube.ScaleCubeMessageHandler;
+import org.apache.ignite.network.scalecube.ScaleCubeNetworkCluster;
+
+/**
+ * Factory of different implementation of {@link NetworkCluster}.
+ */
+public class NetworkClusterFactory {
+ /** Unique name of network member. */
+ private final String localMemberName;
+
+ /** Local port. */
+ private final int localPort;
+
+ /** Network addresses to find another members in cluster. */
+ private final List<String> addresses;
+
+ /**
+ * @param localMemberName Unique name of network member.
+ * @param port Local port.
+ * @param addresses Network addresses to find another members in cluster.
+ */
+ public NetworkClusterFactory(String localMemberName, int port, List<String> addresses) {
+ this.localMemberName = localMemberName;
+ localPort = port;
+ this.addresses = addresses;
+ }
+
+ /**
+ * Implementation of {@link NetworkCluster} based on ScaleCube.
+ *
+ * @param memberResolver Member resolve which allows convert {@link org.apache.ignite.network.NetworkMember} to
+ * inner ScaleCube type and otherwise.
+ * @param messageHandlerHolder Holder of all cluster message handlers.
+ * @return {@link NetworkCluster} instance.
+ */
+ public NetworkCluster startScaleCubeBasedCluster(
+ ScaleCubeMemberResolver memberResolver,
+ MessageHandlerHolder messageHandlerHolder
+ ) {
+ Cluster cluster = new ClusterImpl()
+ .handler(cl -> new ScaleCubeMessageHandler(cl, memberResolver, messageHandlerHolder))
+ .config(opts -> opts
+ .memberAlias(localMemberName)
+ .transport(trans -> trans.port(localPort))
+ )
+ .membership(opts -> opts.seedMembers(addresses.stream().map(Address::from).collect(Collectors.toList())))
+ .startAwait();
+
+ return new ScaleCubeNetworkCluster(cluster, memberResolver, messageHandlerHolder);
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkHandlersProvider.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkHandlersProvider.java
new file mode 100644
index 0000000..91b5b75
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkHandlersProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network;
+
+/**
+ * Provider of handlers of different cluster events.
+ */
+public interface NetworkHandlersProvider {
+ /**
+ * @return Handler for processing the received messages from the cluster.
+ */
+ default NetworkMessageHandler messageHandler() {
+ return null;
+ }
+
+ /**
+ * @return Handler for processing the different cluster events.
+ */
+ default NetworkClusterEventHandler clusterEventHandler() {
+ return null;
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkMember.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkMember.java
new file mode 100644
index 0000000..9c5e2bc
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkMember.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network;
+
+import java.util.Objects;
+
+/**
+ * Representation of the network member.
+ */
+public class NetworkMember {
+ /** Unique name of member in cluster. */
+ private final String name;
+
+ /**
+ * @param name Unique name of member in cluster.
+ */
+ public NetworkMember(String name) {
+ this.name = name;
+ }
+
+ /**
+ * @return Unique name of member in cluster.
+ */
+ public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ NetworkMember member = (NetworkMember)o;
+ return Objects.equals(name, member.name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "NetworkMember{" +
+ "name='" + name + '\'' +
+ '}';
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkMessage.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkMessage.java
new file mode 100644
index 0000000..c545fc7
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkMessage.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network;
+
+/**
+ * Message for exchange information in cluster.
+ */
+public class NetworkMessage {
+ /** Custom data. */
+ private final Object data;
+
+ /** Network member who sent this message. */
+ private final NetworkMember senderMember;
+
+ /**
+ * @param data Custom data.
+ * @param senderMember Network member who sent this message.
+ */
+ public NetworkMessage(Object data, NetworkMember senderMember) {
+ this.data = data;
+ this.senderMember = senderMember;
+ }
+
+ /**
+ * @param <T> Type of message.
+ * @return Custom data.
+ */
+ public <T> T data() {
+ return (T)data;
+ }
+
+ /**
+ * @return Network member who sent this message.
+ */
+ public NetworkMember sender() {
+ return senderMember;
+ }
+
+ @Override public String toString() {
+ return "NetworkMessage{" +
+ "data=" + data +
+ ", senderMember=" + senderMember +
+ '}';
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
new file mode 100644
index 0000000..65de5b3
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network;
+
+/**
+ * Handler of incoming messages.
+ */
+public interface NetworkMessageHandler {
+ /**
+ * @param message Message which was received from cluster.
+ */
+ void onReceived(NetworkMessage message);
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMemberResolver.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMemberResolver.java
new file mode 100644
index 0000000..361e2f8
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMemberResolver.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network.scalecube;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import io.scalecube.cluster.Member;
+import org.apache.ignite.network.NetworkMember;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Resolver for scalecube specific member.
+ */
+public class ScaleCubeMemberResolver {
+ /** Map of public network member by its unique name. */
+ private final Map<String, NetworkMember> directMemberMap = new ConcurrentHashMap<>();
+
+ /** Map of scalecube member by its public member. */
+ private final Map<NetworkMember, Member> reverseMemberMap = new ConcurrentHashMap<>();
+
+ /**
+ * Getting the existed member by scalecube member or create new one.
+ *
+ * @param member ScaleCube specific member.
+ * @return Public network member instance.
+ */
+ public NetworkMember resolveNetworkMember(Member member) {
+ String alias = member.alias();
+
+ NetworkMember networkMember = directMemberMap.get(alias);
+
+ if (networkMember != null)
+ return networkMember;
+
+ networkMember = directMemberMap.computeIfAbsent(alias, NetworkMember::new);
+
+ reverseMemberMap.put(networkMember, member);
+
+ return networkMember;
+ }
+
+ /**
+ * @param member Public network member.
+ * @return ScaleCube specific member.
+ */
+ public Member resolveMember(NetworkMember member) {
+ return requireNonNull(reverseMemberMap.get(member));
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java
new file mode 100644
index 0000000..9a04e66
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network.scalecube;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import io.scalecube.cluster.Cluster;
+import io.scalecube.cluster.ClusterMessageHandler;
+import io.scalecube.cluster.membership.MembershipEvent;
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.net.Address;
+import org.apache.ignite.network.NetworkClusterEventHandler;
+import org.apache.ignite.network.NetworkMember;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.network.MessageHandlerHolder;
+
+/**
+ * Integration class for adapting {@link NetworkMessageHandler} and {@link NetworkClusterEventHandler} in terms of
+ * ScaleCube.
+ */
+public class ScaleCubeMessageHandler implements ClusterMessageHandler {
+ /** Instance of scalecube cluster. */
+ private final Cluster cluster;
+
+ /** Resolver from/to inner member to/from public one. */
+ private final ScaleCubeMemberResolver scaleCubeMemberResolver;
+
+ /** Storage of all handlers for execution. */
+ private final MessageHandlerHolder messageHandlerHolder;
+
+ /** Utility map for recognizing member for its address(scalecube doesn't provide such information in input message). */
+ private final Map<Address, NetworkMember> addressMemberMap = new ConcurrentHashMap<>();
+
+ /**
+ * @param cluster Instance of scalecube cluster.
+ * @param resolver Resolver from/to inner member to/from public one.
+ * @param holder Storage of all handlers for execution.
+ */
+ public ScaleCubeMessageHandler(
+ Cluster cluster,
+ ScaleCubeMemberResolver resolver,
+ MessageHandlerHolder holder
+ ) {
+ this.cluster = cluster;
+ scaleCubeMemberResolver = resolver;
+ messageHandlerHolder = holder;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessage(Message message) {
+ for (NetworkMessageHandler handler : messageHandlerHolder.messageHandlers()) {
+ handler.onReceived(new NetworkMessage(message.data(), memberForAddress(message.sender())));
+ }
+ }
+
+ /**
+ * @param address Inet address.
+ * @return Network member corresponded to input address.
+ */
+ private NetworkMember memberForAddress(Address address) {
+ return addressMemberMap.computeIfAbsent(address,
+ (key) -> cluster
+ .members().stream()
+ .filter(mem -> mem.address().equals(address))
+ .map(scaleCubeMemberResolver::resolveNetworkMember)
+ .findFirst()
+ .orElse(null)
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMembershipEvent(MembershipEvent event) {
+ for (NetworkClusterEventHandler lsnr : messageHandlerHolder.clusterEventHandlers()) {
+ if (event.type() == MembershipEvent.Type.ADDED)
+ lsnr.onAppeared(scaleCubeMemberResolver.resolveNetworkMember(event.member()));
+ else if (event.type() == MembershipEvent.Type.LEAVING || event.type() == MembershipEvent.Type.REMOVED)
+ lsnr.onDisappeared((scaleCubeMemberResolver.resolveNetworkMember(event.member())));
+ else if (event.type() == MembershipEvent.Type.UPDATED) {
+ //do nothing.
+ }
+ else
+ throw new RuntimeException("This event is not supported: event = " + event);
+ }
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
new file mode 100644
index 0000000..0dc1087
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network.scalecube;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import io.scalecube.cluster.Cluster;
+import org.apache.ignite.network.NetworkCluster;
+import org.apache.ignite.network.NetworkClusterEventHandler;
+import org.apache.ignite.network.NetworkHandlersProvider;
+import org.apache.ignite.network.NetworkMember;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.network.MessageHandlerHolder;
+
+import static io.scalecube.cluster.transport.api.Message.fromData;
+
+/**
+ * Implementation of {@link NetworkCluster} based on ScaleCube.
+ */
+public class ScaleCubeNetworkCluster implements NetworkCluster {
+ /** Inner representation of cluster of scalecube. */
+ private final Cluster cluster;
+
+ /** Resolver for scalecube specific member. */
+ private final ScaleCubeMemberResolver memberResolver;
+
+ /** Holder of all cluster handlers. */
+ private final MessageHandlerHolder messageHandlerHolder;
+
+ /**
+ * @param cluster Inner representation of cluster of scalecube.
+ * @param memberResolver Resolver for scalecube specific member.
+ * @param messageHandlerHolder Holder of all cluster handlers.
+ */
+ public ScaleCubeNetworkCluster(
+ Cluster cluster,
+ ScaleCubeMemberResolver memberResolver,
+ MessageHandlerHolder messageHandlerHolder
+ ) {
+ this.messageHandlerHolder = messageHandlerHolder;
+ this.cluster = cluster;
+ this.memberResolver = memberResolver;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void shutdown() throws Exception {
+ cluster.shutdown();
+
+ cluster.onShutdown().block();
+ }
+
+ /** {@inheritDoc} */
+ @Override public NetworkMember localMember() {
+ return memberResolver.resolveNetworkMember(cluster.member());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<NetworkMember> allMembers() {
+ return cluster.members().stream()
+ .map(memberResolver::resolveNetworkMember)
+ .collect(Collectors.toList());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void weakSend(NetworkMember member, Object msg) {
+ cluster.send(memberResolver.resolveMember(member), fromData(msg))
+ .block();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Future<?> guaranteedSend(NetworkMember member, Object msg) {
+ cluster.send(memberResolver.resolveMember(member), fromData(msg))
+ .block();
+
+ CompletableFuture<Object> future = new CompletableFuture<>();
+
+ future.complete(null);
+
+ return future;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addHandlersProvider(NetworkHandlersProvider networkHandlersProvider) {
+ NetworkClusterEventHandler lsnr = networkHandlersProvider.clusterEventHandler();
+
+ if (lsnr != null)
+ messageHandlerHolder.addClusterEventHandlers(lsnr);
+
+ NetworkMessageHandler messageHandler = networkHandlersProvider.messageHandler();
+
+ if (messageHandler != null)
+ messageHandlerHolder.addmessageHandlers(messageHandler);
+ }
+}
diff --git a/parent/pom.xml b/parent/pom.xml
index be18342..573cff7 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -71,6 +71,7 @@
<spoon.framework.version>8.3.0</spoon.framework.version>
<typesafe.version>1.4.1</typesafe.version>
<hamcrest.version>2.2</hamcrest.version>
+ <scalecube.version>2.6.6</scalecube.version>
<!-- Plugins versions -->
<apache.rat.plugin.version>0.13</apache.rat.plugin.version>
@@ -241,6 +242,12 @@
<artifactId>hamcrest-library</artifactId>
<version>${hamcrest.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>io.scalecube</groupId>
+ <artifactId>scalecube-cluster</artifactId>
+ <version>${scalecube.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/pom.xml b/pom.xml
index abf5ce6..ade1018 100644
--- a/pom.xml
+++ b/pom.xml
@@ -40,6 +40,7 @@
<module>modules/configuration-annotation-processor</module>
<module>modules/rest</module>
<module>modules/runner</module>
+ <module>modules/network</module>
</modules>
<build>