You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/04/12 12:25:26 UTC
[ignite-3] branch main updated: IGNITE-14382 Improved network
module API structure. - Fixes #84.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new fcfe1a6 IGNITE-14382 Improved network module API structure. - Fixes #84.
fcfe1a6 is described below
commit fcfe1a6b409b2437dc0d0f81b2b8a8b21f7f33a3
Author: Aleksandr Polovtsev <al...@gmail.com>
AuthorDate: Mon Apr 12 15:23:56 2021 +0300
IGNITE-14382 Improved network module API structure. - Fixes #84.
Signed-off-by: Alexey Scherbakov <al...@gmail.com>
---
modules/README.md | 2 +-
modules/network/README.md | 13 +-
.../ignite/network/DirectSerializationTest.java | 26 +--
.../network/{scalecube => }/TestMessage.java | 2 +-
...r.java => TestMessageSerializationFactory.java} | 17 +-
.../ITScaleCubeNetworkClusterMessagingTest.java | 184 ---------------------
.../scalecube/ITScaleCubeNetworkMessagingTest.java | 134 +++++++++++++++
.../scalecube/TestNetworkHandlersProvider.java | 63 -------
.../scalecube/TestRequestSerializerProvider.java | 42 -----
.../scalecube/TestResponseSerializerProvider.java | 42 -----
.../ignite/network/AbstractClusterService.java | 61 +++++++
.../ignite/network/AbstractMessagingService.java} | 34 ++--
.../ignite/network/AbstractTopologyService.java} | 36 ++--
.../ignite/network/ClusterLocalConfiguration.java | 84 ++++++++++
.../{NetworkMember.java => ClusterNode.java} | 38 ++++-
...lusterEventHandler.java => ClusterService.java} | 34 ++--
...sterFactory.java => ClusterServiceFactory.java} | 12 +-
.../ignite/network/MessageHandlerHolder.java | 59 -------
.../apache/ignite/network/MessagingService.java | 74 +++++++++
.../java/org/apache/ignite/network/Network.java | 66 --------
.../org/apache/ignite/network/NetworkCluster.java | 95 -----------
.../ignite/network/NetworkClusterContext.java | 57 -------
.../ignite/network/NetworkMessageHandler.java | 6 +-
...lersProvider.java => TopologyEventHandler.java} | 17 +-
...usterEventHandler.java => TopologyService.java} | 27 +--
.../network/internal/MessageSerializerFactory.java | 55 ------
.../internal/direct/DirectMessageReader.java | 19 +--
.../internal/direct/DirectMessageWriter.java | 9 +-
.../stream/DirectByteBufferStreamImplV1.java | 15 +-
...vider.java => MessageSerializationFactory.java} | 16 +-
.../message/MessageSerializationRegistry.java | 77 +++++++++
.../scalecube/ScaleCubeClusterServiceFactory.java | 82 +++++++++
.../network/scalecube/ScaleCubeMemberResolver.java | 64 -------
.../network/scalecube/ScaleCubeMessageHandler.java | 102 ------------
.../scalecube/ScaleCubeMessagingService.java | 125 ++++++++++++++
.../network/scalecube/ScaleCubeNetworkCluster.java | 132 ---------------
.../scalecube/ScaleCubeNetworkClusterFactory.java | 103 ------------
.../scalecube/ScaleCubeTopologyService.java | 84 ++++++++++
.../java/org/apache/ignite/raft/client/Peer.java | 10 +-
.../client/service/impl/RaftGroupServiceImpl.java | 24 +--
.../raft/client/service/RaftGroupServiceTest.java | 143 ++++++++--------
.../raft/server/ITRaftCounterServerTest.java | 72 ++++----
.../org/apache/ignite/raft/server/RaftServer.java | 4 +-
.../ignite/raft/server/impl/RaftServerImpl.java | 133 +++++++--------
44 files changed, 1077 insertions(+), 1417 deletions(-)
diff --git a/modules/README.md b/modules/README.md
index beaddcf..4c78390 100644
--- a/modules/README.md
+++ b/modules/README.md
@@ -20,4 +20,4 @@ Module Name | Description
[rest](rest/README.md)|REST management endpoint bindings and command handlers
[runner](runner/README.md)|Ignite server node runner. The module that wires up the Ignite components and handles node lifecycle.
[schema](schema/README.md)|Ignite schema API implementation and schema management classes.
-[table](table/README.md)|Ignite table API implementation.
\ No newline at end of file
+[table](table/README.md)|Ignite table API implementation.
diff --git a/modules/network/README.md b/modules/network/README.md
index 14ce10e..70e789f 100644
--- a/modules/network/README.md
+++ b/modules/network/README.md
@@ -20,4 +20,15 @@ receiving messages across nodes in the cluster. Several delivery guarantee optio
On top of the described primitives, the networking module provides a higher-level request-response primitive which can
be thought of as an RPC call, implying a single response for the given request. This primitive requires that the message
being sent has a unique identifier that can be matched with response on receipt.
-
+
+## Concepts and interfaces
+
+This module provides the following interfaces and implementations:
+
+1. `ClusterService` interface represents the current node and the entry point for network-related activity in a cluster.
+2. `ClusterLocalConfiguration` contains some state of the current node, e.g. its alias and configuration.
+3. `ClusterServiceFactory` is the main way of starting a node.
+4. `TopologyService` provides information about the cluster members and allows registering listeners for topology change
+ events.
+5. `MessagingService` provides a mechanism for sending messages between network members in both weak and patient mode
+ and allows registering listeners for events related to cluster member communication.
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/DirectSerializationTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/DirectSerializationTest.java
index 80ddbd9..21ea815 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/DirectSerializationTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/DirectSerializationTest.java
@@ -18,18 +18,13 @@
package org.apache.ignite.network;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
-import org.apache.ignite.network.internal.MessageSerializerFactory;
import org.apache.ignite.network.internal.direct.DirectMessageReader;
import org.apache.ignite.network.internal.direct.DirectMessageWriter;
import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
import org.apache.ignite.network.message.MessageSerializer;
-import org.apache.ignite.network.message.MessageSerializerProvider;
-import org.apache.ignite.network.message.NetworkMessage;
-import org.apache.ignite.network.scalecube.TestMessage;
-import org.apache.ignite.network.scalecube.TestMessageSerializerProvider;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -44,13 +39,8 @@ public class DirectSerializationTest {
/** */
@Test
public void test() {
- MessageSerializerProvider[] messageMapperProviders = new MessageSerializerProvider[Short.MAX_VALUE << 1];
-
- TestMessageSerializerProvider tProv = new TestMessageSerializerProvider();
-
- messageMapperProviders[TestMessage.TYPE] = tProv;
-
- MessageSerializerFactory factory = new MessageSerializerFactory(Arrays.asList(messageMapperProviders));
+ var registry = new MessageSerializationRegistry()
+ .registerFactory(TestMessage.TYPE, new TestMessageSerializationFactory());
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 10_000; i++) {
@@ -68,7 +58,7 @@ public class DirectSerializationTest {
short directType = message.directType();
DirectMessageWriter writer = new DirectMessageWriter((byte) 1);
- MessageSerializer<NetworkMessage> serializer = factory.createSerializer(directType);
+ MessageSerializer<TestMessage> serializer = registry.createSerializer(directType);
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4096);
@@ -89,7 +79,7 @@ public class DirectSerializationTest {
byteBuffer.flip();
- DirectMessageReader reader = new DirectMessageReader(factory, (byte) 1);
+ DirectMessageReader reader = new DirectMessageReader(registry, (byte) 1);
reader.setBuffer(byteBuffer);
byte type1 = byteBuffer.get();
@@ -97,15 +87,15 @@ public class DirectSerializationTest {
short messageType = makeMessageType(type1, type2);
- MessageDeserializer<NetworkMessage> deserializer = factory.createDeserializer(messageType);
+ MessageDeserializer<TestMessage> deserializer = registry.createDeserializer(messageType);
boolean read = deserializer.readMessage(reader);
assertTrue(read);
- TestMessage readMessage = (TestMessage) deserializer.getMessage();
+ TestMessage readMessage = deserializer.getMessage();
assertEquals(message.msg(), readMessage.msg());
- assertTrue(message.getMap().equals(readMessage.getMap()));
+ assertEquals(message.getMap(), readMessage.getMap());
}
/**
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessage.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
similarity index 97%
rename from modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessage.java
rename to modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
index 342c214..d87715e 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessage.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
@@ -16,7 +16,7 @@
*/
-package org.apache.ignite.network.scalecube;
+package org.apache.ignite.network;
import java.io.Serializable;
import java.util.Map;
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessageSerializerProvider.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessageSerializationFactory.java
similarity index 87%
rename from modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessageSerializerProvider.java
rename to modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessageSerializationFactory.java
index c37458c..5d8bc45 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessageSerializerProvider.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessageSerializationFactory.java
@@ -15,23 +15,23 @@
* limitations under the License.
*/
-package org.apache.ignite.network.scalecube;
+package org.apache.ignite.network;
import java.util.Map;
import org.apache.ignite.network.internal.MessageReader;
import org.apache.ignite.network.message.MessageDeserializer;
import org.apache.ignite.network.message.MessageMappingException;
import org.apache.ignite.network.message.MessageSerializer;
-import org.apache.ignite.network.message.MessageSerializerProvider;
+import org.apache.ignite.network.message.MessageSerializationFactory;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
/**
* Mapper for {@link TestMessage}.
*/
-public class TestMessageSerializerProvider implements MessageSerializerProvider<TestMessage> {
+public class TestMessageSerializationFactory implements MessageSerializationFactory<TestMessage> {
/** {@inheritDoc} */
@Override public MessageDeserializer<TestMessage> createDeserializer() {
- return new MessageDeserializer<TestMessage>() {
+ return new MessageDeserializer<>() {
private TestMessage obj;
@@ -52,7 +52,7 @@ public class TestMessageSerializerProvider implements MessageSerializerProvider<
reader.incrementState();
- //noinspection fallthrough
+ //noinspection fallthrough
case 1:
msg = reader.readString("msg");
@@ -84,7 +84,7 @@ public class TestMessageSerializerProvider implements MessageSerializerProvider<
@Override public MessageSerializer<TestMessage> createSerializer() {
return (message, writer) -> {
if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(message.directType(), fieldsCount()))
+ if (!writer.writeHeader(message.directType(), (byte) 1))
return false;
writer.onHeaderWritten();
@@ -109,9 +109,4 @@ public class TestMessageSerializerProvider implements MessageSerializerProvider<
return true;
};
}
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 1;
- }
}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java
deleted file mode 100644
index f6886d5..0000000
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.network.scalecube;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.network.Network;
-import org.apache.ignite.network.NetworkCluster;
-import org.apache.ignite.network.NetworkClusterEventHandler;
-import org.apache.ignite.network.NetworkHandlersProvider;
-import org.apache.ignite.network.NetworkMember;
-import org.apache.ignite.network.NetworkMessageHandler;
-import org.apache.ignite.network.message.NetworkMessage;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Test;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-
-/** */
-class ITScaleCubeNetworkClusterMessagingTest {
- /** */
- private static final long CHECK_INTERVAL = 200;
-
- /** */
- private static final long CLUSTER_TIMEOUT = TimeUnit.SECONDS.toMillis(3);
-
- /** */
- private final Queue<NetworkCluster> startedMembers = new ConcurrentLinkedQueue<>();
-
- /** */
- @AfterEach
- public void afterEach() throws Exception {
- Iterator<NetworkCluster> iterator = startedMembers.iterator();
-
- while (iterator.hasNext()) {
- iterator.next().shutdown();
-
- iterator.remove();
- }
-
- TestNetworkHandlersProvider.MESSAGE_STORAGE.clear();
- }
-
- /** */
- @Test
- public void messageWasSentToAllMembersSuccessfully() throws Exception {
- //Given: Three started member which are gathered to cluster.
- List<String> addresses = List.of("localhost:3344", "localhost:3345", "localhost:3346");
-
- CountDownLatch latch = new CountDownLatch(3);
-
- NetworkCluster alice = startMember("Alice", 3344, addresses);
- NetworkCluster bob = startMember("Bob", 3345, addresses);
- NetworkCluster carol = startMember("Carol", 3346, addresses);
-
- final NetworkHandlersProvider messageWaiter = new NetworkHandlersProvider() {
- /** {@inheritDoc} */
- @Override public NetworkMessageHandler messageHandler() {
- return (message, sender, corellationId) -> {
- latch.countDown();
- };
- }
- };
-
- alice.addHandlersProvider(messageWaiter);
- bob.addHandlersProvider(messageWaiter);
- carol.addHandlersProvider(messageWaiter);
-
- waitForCluster(alice);
-
- TestMessage sentMessage = new TestMessage("Message from Alice", null);
-
- //When: Send one message to all members in cluster.
- for (NetworkMember member : alice.allMembers()) {
- System.out.println("SEND : " + member);
-
- alice.weakSend(member, sentMessage);
- }
-
- latch.await(3, TimeUnit.SECONDS);
-
- //Then: All members successfully received message.
- assertThat(getLastMessage(alice), is(sentMessage));
- assertThat(getLastMessage(bob), is(sentMessage));
- assertThat(getLastMessage(carol), is(sentMessage));
- }
-
- /** */
- private NetworkMessage getLastMessage(NetworkCluster alice) {
- return TestNetworkHandlersProvider.MESSAGE_STORAGE.get(alice.localMember().name());
- }
-
- /**
- * @return Started member.
- */
- private NetworkCluster startMember(String name, int port, List<String> addresses) {
- Network network = new Network(
- new ScaleCubeNetworkClusterFactory(name, port, addresses, new ScaleCubeMemberResolver())
- );
-
- network.registerMessageMapper(TestMessage.TYPE, new TestMessageSerializerProvider());
- network.registerMessageMapper(TestRequest.TYPE, new TestRequestSerializerProvider());
- network.registerMessageMapper(TestResponse.TYPE, new TestResponseSerializerProvider());
-
- NetworkCluster member = network.start();
-
- member.addHandlersProvider(new TestNetworkHandlersProvider(name));
-
- System.out.println("-----" + name + " started");
-
- startedMembers.add(member);
-
- return member;
- }
-
- /**
- * Wait for cluster to come up.
- * @param cluster Network cluster.
- */
- private void waitForCluster(NetworkCluster cluster) {
- AtomicInteger integer = new AtomicInteger(0);
-
- cluster.addHandlersProvider(new NetworkHandlersProvider() {
- /** {@inheritDoc} */
- @Override public NetworkClusterEventHandler clusterEventHandler() {
- return new NetworkClusterEventHandler() {
- /** {@inheritDoc} */
- @Override public void onAppeared(NetworkMember member) {
- integer.set(cluster.allMembers().size());
- }
-
- /** {@inheritDoc} */
- @Override public void onDisappeared(NetworkMember member) {
- integer.decrementAndGet();
- }
- };
- }
- });
-
- integer.set(cluster.allMembers().size());
-
- long curTime = System.currentTimeMillis();
- long endTime = curTime + CLUSTER_TIMEOUT;
-
- while (curTime < endTime) {
- if (integer.get() == startedMembers.size()) {
- return;
- }
-
- if (CHECK_INTERVAL > 0) {
- try {
- Thread.sleep(CHECK_INTERVAL);
- }
- catch (InterruptedException ignored) {
- }
- }
-
- curTime = System.currentTimeMillis();
- }
-
- throw new RuntimeException("Failed to wait for cluster startup");
- }
-
-}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
new file mode 100644
index 0000000..c6fc15f
--- /dev/null
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network.scalecube;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TopologyEventHandler;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** */
+class ITScaleCubeNetworkMessagingTest {
+ /** */
+ private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
+
+ /** */
+ private static final ClusterServiceFactory NETWORK_FACTORY = new ScaleCubeClusterServiceFactory();
+
+ /** */
+ private final Map<String, NetworkMessage> messageStorage = new ConcurrentHashMap<>();
+
+ /** */
+ private final List<ClusterService> startedMembers = new ArrayList<>();
+
+ /** */
+ @AfterEach
+ public void afterEach() {
+ startedMembers.forEach(ClusterService::shutdown);
+ }
+
+ /**
+ * Test sending and receiving messages.
+ */
+ @Test
+ public void messageWasSentToAllMembersSuccessfully() throws Exception {
+ //Given: Three started member which are gathered to cluster.
+ List<String> addresses = List.of("localhost:3344", "localhost:3345", "localhost:3346");
+
+ CountDownLatch latch = new CountDownLatch(3);
+
+ ClusterService alice = startNetwork("Alice", 3344, addresses);
+ ClusterService bob = startNetwork("Bob", 3345, addresses);
+ ClusterService carol = startNetwork("Carol", 3346, addresses);
+
+ NetworkMessageHandler messageWaiter = (message, sender, correlationId) -> latch.countDown();
+
+ alice.messagingService().addMessageHandler(messageWaiter);
+ bob.messagingService().addMessageHandler(messageWaiter);
+ carol.messagingService().addMessageHandler(messageWaiter);
+
+ TestMessage testMessage = new TestMessage("Message from Alice", Collections.emptyMap());
+
+ //When: Send one message to all members in cluster.
+ for (ClusterNode member : alice.topologyService().allMembers()) {
+ System.out.println("SEND : " + member);
+
+ alice.messagingService().weakSend(member, testMessage);
+ }
+
+ boolean done = latch.await(3, TimeUnit.SECONDS);
+ assertTrue(done);
+
+ //Then: All members successfully received message.
+ assertThat(getLastMessage(alice), is(testMessage));
+ assertThat(getLastMessage(bob), is(testMessage));
+ assertThat(getLastMessage(carol), is(testMessage));
+ }
+
+ /** */
+ private NetworkMessage getLastMessage(ClusterService clusterService) {
+ return messageStorage.get(clusterService.localConfiguration().getName());
+ }
+
+ /** */
+ private ClusterService startNetwork(String name, int port, List<String> addresses) {
+ var context = new ClusterLocalConfiguration(name, port, addresses, SERIALIZATION_REGISTRY);
+
+ ClusterService clusterService = NETWORK_FACTORY.createClusterService(context);
+ System.out.println("-----" + name + " started");
+
+ clusterService.messagingService().addMessageHandler((message, sender, correlationId) -> {
+ messageStorage.put(name, message);
+
+ System.out.println(name + " handled messages : " + message);
+ });
+
+ clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
+ @Override public void onAppeared(ClusterNode member) {
+ System.out.println(name + " found member : " + member);
+ }
+
+ @Override public void onDisappeared(ClusterNode member) {
+ System.out.println(name + " lost member : " + member);
+ }
+ });
+
+ clusterService.start();
+
+ startedMembers.add(clusterService);
+
+ return clusterService;
+ }
+}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java
deleted file mode 100644
index 6ef82d8..0000000
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.ignite.network.scalecube;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.ignite.network.NetworkClusterEventHandler;
-import org.apache.ignite.network.NetworkHandlersProvider;
-import org.apache.ignite.network.NetworkMember;
-import org.apache.ignite.network.message.NetworkMessage;
-import org.apache.ignite.network.NetworkMessageHandler;
-
-/** */
-class TestNetworkHandlersProvider implements NetworkHandlersProvider {
- /** */
- public static Map<String, NetworkMessage> MESSAGE_STORAGE = new ConcurrentHashMap<>();
-
- /** */
- private final String localName;
-
- /** */
- TestNetworkHandlersProvider(String name) {
- localName = name;
- }
-
- /** {@inheritDoc} */
- @Override public NetworkMessageHandler messageHandler() {
- return (event, sender, corellationId) -> {
- MESSAGE_STORAGE.put(localName, event);
-
- System.out.println(localName + " handled messages : " + event);
- };
- }
-
- /** {@inheritDoc} */
- @Override public NetworkClusterEventHandler clusterEventHandler() {
- return new NetworkClusterEventHandler() {
- @Override public void onAppeared(NetworkMember member) {
- System.out.println(localName + " found member : " + member);
- }
-
- @Override public void onDisappeared(NetworkMember member) {
- System.out.println(localName + " lost member : " + member);
- }
- };
- }
-}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestRequestSerializerProvider.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestRequestSerializerProvider.java
deleted file mode 100644
index 36585af..0000000
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestRequestSerializerProvider.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.network.scalecube;
-
-import org.apache.ignite.network.message.MessageDeserializer;
-import org.apache.ignite.network.message.MessageSerializer;
-import org.apache.ignite.network.message.MessageSerializerProvider;
-
-/**
- * Mapper for {@link TestRequest}.
- */
-public class TestRequestSerializerProvider implements MessageSerializerProvider<TestRequest> {
- /** {@inheritDoc} */
- @Override public MessageDeserializer<TestRequest> createDeserializer() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public MessageSerializer<TestRequest> createSerializer() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 0;
- }
-}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestResponseSerializerProvider.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestResponseSerializerProvider.java
deleted file mode 100644
index 7eb8626..0000000
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestResponseSerializerProvider.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.network.scalecube;
-
-import org.apache.ignite.network.message.MessageDeserializer;
-import org.apache.ignite.network.message.MessageSerializer;
-import org.apache.ignite.network.message.MessageSerializerProvider;
-
-/**
- * Mapper provider for {@link TestResponse}.
- */
-public class TestResponseSerializerProvider implements MessageSerializerProvider<TestResponse> {
- /** {@inheritDoc} */
- @Override public MessageDeserializer<TestResponse> createDeserializer() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public MessageSerializer<TestResponse> createSerializer() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 0;
- }
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/AbstractClusterService.java b/modules/network/src/main/java/org/apache/ignite/network/AbstractClusterService.java
new file mode 100644
index 0000000..ec551d3
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/AbstractClusterService.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+/**
+ * Default implementation of a {@link ClusterService}.
+ * <p>
+ * Extending classes should use {@link #start()} and {@link #shutdown()} to allocate and free any network-related
+ * resources.
+ */
+public abstract class AbstractClusterService implements ClusterService {
+ /** Context. */
+ private final ClusterLocalConfiguration context;
+
+ /** Topology service. */
+ private final TopologyService topologyService;
+
+ /** Messaging service. */
+ private final MessagingService messagingService;
+
+ /** */
+ public AbstractClusterService(
+ ClusterLocalConfiguration context,
+ TopologyService topologyService,
+ MessagingService messagingService
+ ) {
+ this.context = context;
+ this.topologyService = topologyService;
+ this.messagingService = messagingService;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final ClusterLocalConfiguration localConfiguration() {
+ return context;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final TopologyService topologyService() {
+ return topologyService;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final MessagingService messagingService() {
+ return messagingService;
+ }
+}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestRequest.java b/modules/network/src/main/java/org/apache/ignite/network/AbstractMessagingService.java
similarity index 55%
rename from modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestRequest.java
rename to modules/network/src/main/java/org/apache/ignite/network/AbstractMessagingService.java
index f6ea436..61046da 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestRequest.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/AbstractMessagingService.java
@@ -15,34 +15,28 @@
* limitations under the License.
*/
-package org.apache.ignite.network.scalecube;
+package org.apache.ignite.network;
-import org.apache.ignite.network.message.NetworkMessage;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CopyOnWriteArrayList;
/**
- * Test request which requires {@link TestResponse} as a response.
+ * Base class for {@link MessagingService} implementations.
*/
-public class TestRequest implements NetworkMessage {
- /** Public type for tests. */
- public static final short TYPE = 1;
+public abstract class AbstractMessagingService implements MessagingService {
+ /** */
+ private final Collection<NetworkMessageHandler> messageHandlers = new CopyOnWriteArrayList<>();
- /** Some test value. */
- private final int number;
-
- /** Constructor. */
- public TestRequest(int number) {
- this.number = number;
+ /** {@inheritDoc} */
+ @Override public void addMessageHandler(NetworkMessageHandler handler) {
+ messageHandlers.add(handler);
}
/**
- * @return Test value.
+ * @return registered message handlers.
*/
- public int number() {
- return number;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return TYPE;
+ public Collection<NetworkMessageHandler> getMessageHandlers() {
+ return Collections.unmodifiableCollection(messageHandlers);
}
}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestResponse.java b/modules/network/src/main/java/org/apache/ignite/network/AbstractTopologyService.java
similarity index 56%
rename from modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestResponse.java
rename to modules/network/src/main/java/org/apache/ignite/network/AbstractTopologyService.java
index 13697d3..4ffd5a9 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestResponse.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/AbstractTopologyService.java
@@ -15,36 +15,28 @@
* limitations under the License.
*/
-package org.apache.ignite.network.scalecube;
+package org.apache.ignite.network;
-import org.apache.ignite.network.message.NetworkMessage;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CopyOnWriteArrayList;
/**
- * Test response.
+ * Base class for {@link TopologyService} implementations.
*/
-public class TestResponse implements NetworkMessage {
- /** Public type for tests. */
- public static final short TYPE = 2;
+public abstract class AbstractTopologyService implements TopologyService {
+ /** */
+ private final Collection<TopologyEventHandler> eventHandlers = new CopyOnWriteArrayList<>();
- /**
- * Some response test value.
- */
- private final int responseNumber;
-
- /** Constructor. */
- public TestResponse(int responseNumber) {
- this.responseNumber = responseNumber;
+ /** {@inheritDoc} */
+ @Override public void addEventHandler(TopologyEventHandler handler) {
+ eventHandlers.add(handler);
}
/**
- * @return Response test value.
+ * @return registered event handlers.
*/
- public int responseNumber() {
- return responseNumber;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return TYPE;
+ public Collection<TopologyEventHandler> getEventHandlers() {
+ return Collections.unmodifiableCollection(eventHandlers);
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java b/modules/network/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
new file mode 100644
index 0000000..2bc8e53
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.util.List;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+
+/**
+ * Network configuration of a node.
+ *
+ * TODO: migrate to common configuration class when it's available, see
+ * https://issues.apache.org/jira/browse/IGNITE-14496
+ */
+public class ClusterLocalConfiguration {
+ /** The network alias of a node. */
+ private final String name;
+
+ /** The port. */
+ private final int port;
+
+ /** Addresses of other nodes. */
+ private final List<String> memberAddresses;
+
+ /** Message mapper providers. */
+ private final MessageSerializationRegistry serializationRegistry;
+
+ /**
+ * @param name Local name.
+ * @param port Local port.
+ * @param memberAddresses Other cluster member addresses.
+ * @param serializationRegistry Message serialization registry.
+ */
+ public ClusterLocalConfiguration(
+ String name, int port, List<String> memberAddresses, MessageSerializationRegistry serializationRegistry
+ ) {
+ this.name = name;
+ this.port = port;
+ this.memberAddresses = List.copyOf(memberAddresses);
+ this.serializationRegistry = serializationRegistry;
+ }
+
+ /**
+ * Network alias of a node.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Port.
+ */
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Addresses of other nodes.
+ */
+ public List<String> getMemberAddresses() {
+ return memberAddresses;
+ }
+
+ /**
+ * Message mapper providers.
+ */
+ public MessageSerializationRegistry getSerializationRegistry() {
+ return serializationRegistry;
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkMember.java b/modules/network/src/main/java/org/apache/ignite/network/ClusterNode.java
similarity index 67%
rename from modules/network/src/main/java/org/apache/ignite/network/NetworkMember.java
rename to modules/network/src/main/java/org/apache/ignite/network/ClusterNode.java
index 41ede74..e0ad6a7 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkMember.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/ClusterNode.java
@@ -20,17 +20,25 @@ import java.io.Serializable;
import java.util.Objects;
/**
- * Representation of the network member.
+ * Representation of a node in a cluster.
*/
-public class NetworkMember implements Serializable {
+public class ClusterNode implements Serializable {
/** Unique name of member in cluster. */
private final String name;
+ /** Node host. */
+ private final String host;
+
+ /** Node port. */
+ private final int port;
+
/**
* @param name Unique name of member in cluster.
*/
- public NetworkMember(String name) {
+ public ClusterNode(String name, String host, int port) {
this.name = name;
+ this.host = host;
+ this.port = port;
}
/**
@@ -40,25 +48,41 @@ public class NetworkMember implements Serializable {
return name;
}
+ /**
+ * @return node host name.
+ */
+ public String host() {
+ return host;
+ }
+
+ /**
+ * @return node port.
+ */
+ public int port() {
+ return port;
+ }
+
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
- NetworkMember member = (NetworkMember)o;
- return Objects.equals(name, member.name);
+ ClusterNode that = (ClusterNode)o;
+ return port == that.port && name.equals(that.name) && host.equals(that.host);
}
/** {@inheritDoc} */
@Override public int hashCode() {
- return Objects.hash(name);
+ return Objects.hash(name, host, port);
}
/** {@inheritDoc} */
@Override public String toString() {
- return "NetworkMember{" +
+ return "ClusterNode{" +
"name='" + name + '\'' +
+ ", host='" + host + '\'' +
+ ", port=" + port +
'}';
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterEventHandler.java b/modules/network/src/main/java/org/apache/ignite/network/ClusterService.java
similarity index 52%
copy from modules/network/src/main/java/org/apache/ignite/network/NetworkClusterEventHandler.java
copy to modules/network/src/main/java/org/apache/ignite/network/ClusterService.java
index 3a11e7e..d218bf7 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterEventHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/ClusterService.java
@@ -14,24 +14,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.ignite.network;
/**
- * Interface for handling events related to cluster changes.
+ * Class that represents the network-related resources of a node and provides entry points for working with the
+ * network members of a cluster.
*/
-public interface NetworkClusterEventHandler {
+public interface ClusterService {
+ /**
+ * Returns the {@link TopologyService} for working with the cluster topology.
+ */
+ TopologyService topologyService();
+
+ /**
+ * Returns the {@link TopologyService} for sending messages to the cluster members.
+ */
+ MessagingService messagingService();
+
+ /**
+ * Returns the context associated with the current node.
+ */
+ ClusterLocalConfiguration localConfiguration();
+
/**
- * Event which happened when one new member was detected in cluster.
- *
- * @param member New network member.
+ * Starts the current node, allowing it to join the cluster and start receiving messages.
*/
- void onAppeared(NetworkMember member);
+ void start();
/**
- * Event which happened when one member leave the cluster. It means the member leaves the cluster permanently. If
- * the connection lost but it is possible to reestablish it, nothing happens here.
- *
- * @param member The network member which leaves the cluster.
+ * Stops the current node, gracefully freeing the encapsulated resources.
*/
- void onDisappeared(NetworkMember member);
+ void shutdown();
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterFactory.java b/modules/network/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
similarity index 77%
rename from modules/network/src/main/java/org/apache/ignite/network/NetworkClusterFactory.java
rename to modules/network/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
index fd26450..b28c42d 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
@@ -16,14 +16,10 @@
*/
package org.apache.ignite.network;
-/**
- * Factory for creating {@link NetworkCluster}.
- */
-public interface NetworkClusterFactory {
+/** */
+public interface ClusterServiceFactory {
/**
- *
- * @param clusterContext
- * @return
+ * Creates a new {@link ClusterService} using the provided context. The created network will not be in the "started" state.
*/
- NetworkCluster startCluster(NetworkClusterContext clusterContext);
+ ClusterService createClusterService(ClusterLocalConfiguration context);
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/MessageHandlerHolder.java b/modules/network/src/main/java/org/apache/ignite/network/MessageHandlerHolder.java
deleted file mode 100644
index dbe0e27..0000000
--- a/modules/network/src/main/java/org/apache/ignite/network/MessageHandlerHolder.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.network;
-
-import java.util.Collection;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-/**
- * Encapsulation of all cluster handlers for centralized management.
- */
-public class MessageHandlerHolder {
- /** Handler for processing incoming messages. */
- private final Collection<NetworkMessageHandler> messageHandlers = new CopyOnWriteArrayList<>();
-
- /** Handler for processing all cluster events. */
- private final Collection<NetworkClusterEventHandler> clusterEventHandlers = new CopyOnWriteArrayList<>();
-
- /**
- * @param handler Handler for processing incoming messages.
- */
- public void addmessageHandlers(NetworkMessageHandler handler) {
- messageHandlers.add(handler);
- }
-
- /**
- * @param handler Handler for processing all cluster events.
- */
- public void addClusterEventHandlers(NetworkClusterEventHandler handler) {
- clusterEventHandlers.add(handler);
- }
-
- /**
- * @return All handlers for processing incoming messages.
- */
- public Collection<NetworkMessageHandler> messageHandlers() {
- return messageHandlers;
- }
-
- /**
- * @return All handler for processing all cluster events.
- */
- public Collection<NetworkClusterEventHandler> clusterEventHandlers() {
- return clusterEventHandlers;
- }
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/MessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/MessagingService.java
new file mode 100644
index 0000000..f4e84c3
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/MessagingService.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Entry point for sending messages between network members in both weak and patient mode.
+ *
+ * TODO: allow removing event handlers, see https://issues.apache.org/jira/browse/IGNITE-14519
+ */
+public interface MessagingService {
+ /**
+ * Tries to send the given message asynchronously to the specific member without any delivery guarantees.
+ *
+ * @param recipient Recipient of the message.
+ * @param msg Message which should be delivered.
+ */
+ void weakSend(ClusterNode recipient, NetworkMessage msg);
+
+ /**
+ * Tries to send the given message asynchronously to the specific cluster member with the following guarantees:
+ * <ul>
+ * <li>Messages will be delivered in the same order as they were sent;</li>
+ * <li>If a message N has been successfully delivered to a member implies that all messages preceding N
+ * have also been successfully delivered.</li>
+ * </ul>
+ *
+ * @param recipient Recipient of the message.
+ * @param msg Message which should be delivered.
+ */
+ CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg);
+
+ /**
+ * Same as {@link #send(ClusterNode, NetworkMessage)} but attaches the given correlation ID to the given message.
+ *
+ * @param recipient Recipient of the message.
+ * @param msg Message which should be delivered.
+ * @param correlationId Correlation id when replying to the request.
+ */
+ CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg, String correlationId);
+
+ /**
+ * Sends a message asynchronously with same guarantees as {@link #send(ClusterNode, NetworkMessage)} and
+ * returns a future that will be completed successfully upon receiving a response.
+ *
+ * @param recipient Recipient of the message.
+ * @param msg A message.
+ * @param timeout Waiting for response timeout in milliseconds.
+ * @return A future holding the response or error if the expected response was not received.
+ */
+ CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout);
+
+ /**
+ * Registers a handler for network message events.
+ */
+ void addMessageHandler(NetworkMessageHandler handler);
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/Network.java b/modules/network/src/main/java/org/apache/ignite/network/Network.java
deleted file mode 100644
index 6d465da..0000000
--- a/modules/network/src/main/java/org/apache/ignite/network/Network.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.network;
-
-import java.util.Arrays;
-import java.util.Collections;
-import org.apache.ignite.network.message.MessageSerializerProvider;
-
-/**
- * Entry point for network module.
- */
-public class Network {
- /** Message mapper providers, messageMapperProviders[message type] -> message mapper provider for message with message type. */
- private final MessageSerializerProvider<?>[] messageSerializerProviders = new MessageSerializerProvider<?>[Short.MAX_VALUE << 1];
-
- /** Message handlers. */
- private final MessageHandlerHolder messageHandlerHolder = new MessageHandlerHolder();
-
- /** Cluster factory. */
- private final NetworkClusterFactory clusterFactory;
-
- /**
- * Constructor.
- * @param factory Cluster factory.
- */
- public Network(NetworkClusterFactory factory) {
- clusterFactory = factory;
- }
-
- /**
- * Register message mapper by message type.
- * @param type Message type.
- * @param mapperProvider Message mapper provider.
- */
- public void registerMessageMapper(short type, MessageSerializerProvider mapperProvider) throws NetworkConfigurationException {
- if (this.messageSerializerProviders[type] != null)
- throw new NetworkConfigurationException("Message mapper for type " + type + " is already defined");
-
- this.messageSerializerProviders[type] = mapperProvider;
- }
-
- /**
- * Start new cluster.
- * @return Network cluster.
- */
- public NetworkCluster start() {
- //noinspection Java9CollectionFactory
- NetworkClusterContext context = new NetworkClusterContext(messageHandlerHolder, Collections.unmodifiableList(Arrays.asList(messageSerializerProviders)));
- return clusterFactory.startCluster(context);
- }
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
deleted file mode 100644
index a03f084..0000000
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.network;
-
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import org.apache.ignite.network.message.NetworkMessage;
-
-/**
- * Main interface for interaction with network. It allows to get information about network members and send messages to
- * them.
- */
-public interface NetworkCluster {
- /**
- * Stop the processing of network connection immediately. Sending and receiving messages or obtaining network
- * members information after this method successfully finished will be impossible.
- *
- * @throws Exception If something went wrong.
- */
- void shutdown() throws Exception;
-
- /**
- * @return Information about local network member.
- */
- NetworkMember localMember();
-
- /**
- * @return Information about all members which have seen by the local member(including local member itself).
- */
- Collection<NetworkMember> allMembers();
-
- /**
- * Try to send the message asynchronously to the specific member without any guarantees that this message would be
- * delivered.
- *
- * @param member Network member which should receive the message.
- * @param msg Message which should be delivered.
- */
- void weakSend(NetworkMember member, NetworkMessage msg);
-
- /**
- * Try to send the message asynchronously to the specific member with next guarantees:
- * * Messages which was sent from one thread to one member will be delivered in the same order as they were sent.
- * * If message N was successfully delivered to the member that means all messages preceding N also were successfully delivered.
- *
- * @param member Network member which should receive the message.
- * @param msg Message which should be delivered.
- */
- Future<Void> send(NetworkMember member, NetworkMessage msg);
-
- /**
- * Try to send the message asynchronously to the specific member with next guarantees:
- * * Messages which was sent from one thread to one member will be delivered in the same order as they were sent.
- * * If message N was successfully delivered to the member that means all messages preceding N also were successfully delivered.
- *
- * @param member Network member which should receive the message.
- * @param msg Message which should be delivered.
- * @param corellationId Corellation id when replying to the request.
- */
- Future<?> send(NetworkMember member, NetworkMessage msg, String corellationId);
-
- /**
- * Sends a message asynchronously with same guarantees as for {@link #send(NetworkMember, NetworkMessage)} and
- * returns a response.
- *
- * @param member Network member which should receive the message.
- * @param msg A message.
- * @param timeout Waiting for response timeout in milliseconds.
- * @param <R> Expected response type.
- * @return A future holding the response or error if the expected response was not received.
- */
- CompletableFuture<NetworkMessage> invoke(NetworkMember member, NetworkMessage msg, long timeout);
-
- /**
- * Add provider which allows to get configured handlers for different cluster events(ex. received message).
- *
- * @param networkHandlersProvider Provider for obtaining cluster event handlers.
- */
- void addHandlersProvider(NetworkHandlersProvider networkHandlersProvider);
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterContext.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterContext.java
deleted file mode 100644
index 5d3ae4a..0000000
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterContext.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.network;
-
-import java.util.Collections;
-import java.util.List;
-import org.apache.ignite.network.message.MessageSerializerProvider;
-
-/**
- * Cluster context.
- */
-public class NetworkClusterContext {
- /** Message handlers. */
- private final MessageHandlerHolder messageHandlerHolder;
-
- /** Message mappers, messageMapperProviders[message type] -> message mapper provider for message with message type. */
- private final List<MessageSerializerProvider<?>> messageSerializerProviders;
-
- /**
- * Constructor.
- * @param messageHandlerHolder Message handlers.
- * @param messageSerializerProviders Message mappers map.
- */
- public NetworkClusterContext(MessageHandlerHolder messageHandlerHolder, List<MessageSerializerProvider<?>> messageSerializerProviders) {
- this.messageHandlerHolder = messageHandlerHolder;
- this.messageSerializerProviders = messageSerializerProviders;
- }
-
- /**
- * @return Message handlers.
- */
- public MessageHandlerHolder messageHandlerHolder() {
- return messageHandlerHolder;
- }
-
- /**
- * @return Message mapper providers list.
- */
- public List<MessageSerializerProvider<?>> messageMapperProviders() {
- return Collections.unmodifiableList(messageSerializerProviders);
- }
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
index 9e4e784..e199d4f 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
@@ -23,9 +23,9 @@ import org.apache.ignite.network.message.NetworkMessage;
*/
public interface NetworkMessageHandler {
/**
- * @param message Message which was received from cluster.
+ * @param message Message which was received from the cluster.
* @param sender Sender.
- * @param corellationId Corellation id.
+ * @param correlationId Correlation id.
*/
- void onReceived(NetworkMessage message, NetworkMember sender, String corellationId);
+ void onReceived(NetworkMessage message, ClusterNode sender, String correlationId);
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkHandlersProvider.java b/modules/network/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
similarity index 66%
rename from modules/network/src/main/java/org/apache/ignite/network/NetworkHandlersProvider.java
rename to modules/network/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
index 91b5b75..3c4cb36 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkHandlersProvider.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
@@ -17,20 +17,17 @@
package org.apache.ignite.network;
/**
- * Provider of handlers of different cluster events.
+ * Interface for handling events related to topology changes.
*/
-public interface NetworkHandlersProvider {
+public interface TopologyEventHandler {
/**
- * @return Handler for processing the received messages from the cluster.
+ * Called when a new member has been detected joining a cluster.
*/
- default NetworkMessageHandler messageHandler() {
- return null;
- }
+ void onAppeared(ClusterNode member);
/**
- * @return Handler for processing the different cluster events.
+ * Indicates that a member has left a cluster. This method is only called when a member leaves permanently (i.e.
+ * it is not possible to re-establish a connection to it).
*/
- default NetworkClusterEventHandler clusterEventHandler() {
- return null;
- }
+ void onDisappeared(ClusterNode member);
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterEventHandler.java b/modules/network/src/main/java/org/apache/ignite/network/TopologyService.java
similarity index 58%
rename from modules/network/src/main/java/org/apache/ignite/network/NetworkClusterEventHandler.java
rename to modules/network/src/main/java/org/apache/ignite/network/TopologyService.java
index 3a11e7e..03c52a2 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterEventHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/TopologyService.java
@@ -16,22 +16,27 @@
*/
package org.apache.ignite.network;
+import java.util.Collection;
+
/**
- * Interface for handling events related to cluster changes.
+ * Entry point for obtaining information about a cluster's topology.
+ *
+ * TODO: allow removing event handlers, see https://issues.apache.org/jira/browse/IGNITE-14519
*/
-public interface NetworkClusterEventHandler {
+public interface TopologyService {
+ /**
+ * @return Information about the local network member.
+ */
+ ClusterNode localMember();
+
/**
- * Event which happened when one new member was detected in cluster.
- *
- * @param member New network member.
+ * @return Information about all members which have discovered by the local member (including the local member
+ * itself).
*/
- void onAppeared(NetworkMember member);
+ Collection<ClusterNode> allMembers();
/**
- * Event which happened when one member leave the cluster. It means the member leaves the cluster permanently. If
- * the connection lost but it is possible to reestablish it, nothing happens here.
- *
- * @param member The network member which leaves the cluster.
+ * Registers a handler for topology change events.
*/
- void onDisappeared(NetworkMember member);
+ void addEventHandler(TopologyEventHandler handler);
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/MessageSerializerFactory.java b/modules/network/src/main/java/org/apache/ignite/network/internal/MessageSerializerFactory.java
deleted file mode 100644
index d5d8942..0000000
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/MessageSerializerFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.network.internal;
-
-import java.util.List;
-import org.apache.ignite.network.message.MessageDeserializer;
-import org.apache.ignite.network.message.MessageSerializer;
-import org.apache.ignite.network.message.MessageSerializerProvider;
-import org.apache.ignite.network.message.NetworkMessage;
-
-/**
- * Factory that provides message serializers and deserializers by {@link NetworkMessage#directType()}.
- */
-public class MessageSerializerFactory {
- /** List of all serializers. Index is the direct type of the message. */
- private final List<MessageSerializerProvider<NetworkMessage>> serializerProviders;
-
- /** Constructor. */
- public MessageSerializerFactory(List<MessageSerializerProvider<NetworkMessage>> mappers) {
- serializerProviders = mappers;
- }
-
- /**
- * Creates a deserializer for a message of the given direct type.
- * @param directType Message's direct type.
- * @return Message deserializer.
- */
- public MessageDeserializer<NetworkMessage> createDeserializer(short directType) {
- return serializerProviders.get(directType).createDeserializer();
- }
-
- /**
- * Creates a serializer for a message of the given direct type.
- * @param directType Message's direct type.
- * @return Message serializer.
- */
- public MessageSerializer<NetworkMessage> createSerializer(short directType) {
- return serializerProviders.get(directType).createSerializer();
- }
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageReader.java b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageReader.java
index 2d4ebbc..6fcbb10 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageReader.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageReader.java
@@ -22,14 +22,13 @@ import java.util.BitSet;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
-import java.util.function.Supplier;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.internal.MessageReader;
-import org.apache.ignite.network.internal.MessageSerializerFactory;
import org.apache.ignite.network.internal.direct.state.DirectMessageState;
import org.apache.ignite.network.internal.direct.state.DirectMessageStateItem;
import org.apache.ignite.network.internal.direct.stream.DirectByteBufferStream;
import org.apache.ignite.network.internal.direct.stream.DirectByteBufferStreamImplV1;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.jetbrains.annotations.Nullable;
@@ -45,15 +44,11 @@ public class DirectMessageReader implements MessageReader {
private boolean lastRead;
/**
- * @param msgFactory Message factory.
+ * @param serializationRegistry Message serializers.
* @param protoVer Protocol version.
*/
- public DirectMessageReader(final MessageSerializerFactory msgFactory, final byte protoVer) {
- state = new DirectMessageState<>(StateItem.class, new Supplier<StateItem>() {
- @Override public StateItem get() {
- return new StateItem(msgFactory, protoVer);
- }
- });
+ public DirectMessageReader(MessageSerializationRegistry serializationRegistry, byte protoVer) {
+ state = new DirectMessageState<>(StateItem.class, () -> new StateItem(serializationRegistry, protoVer));
}
/** {@inheritDoc} */
@@ -387,13 +382,13 @@ public class DirectMessageReader implements MessageReader {
private int state;
/**
- * @param msgFactory Message factory.
+ * @param serializationRegistry Message serializers.
* @param protoVer Protocol version.
*/
- StateItem(MessageSerializerFactory msgFactory, byte protoVer) {
+ StateItem(MessageSerializationRegistry serializationRegistry, byte protoVer) {
switch (protoVer) {
case 1:
- stream = new DirectByteBufferStreamImplV1(msgFactory);
+ stream = new DirectByteBufferStreamImplV1(serializationRegistry);
break;
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageWriter.java b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageWriter.java
index 1fbe6e4..72c514c 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageWriter.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageWriter.java
@@ -22,7 +22,6 @@ import java.util.BitSet;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
-import java.util.function.Supplier;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.internal.MessageWriter;
import org.apache.ignite.network.internal.direct.state.DirectMessageState;
@@ -40,12 +39,8 @@ public class DirectMessageWriter implements MessageWriter {
/**
* @param protoVer Protocol version.
*/
- public DirectMessageWriter(final byte protoVer) {
- state = new DirectMessageState<>(StateItem.class, new Supplier<StateItem>() {
- @Override public StateItem get() {
- return new StateItem(protoVer);
- }
- });
+ public DirectMessageWriter(byte protoVer) {
+ state = new DirectMessageState<>(StateItem.class, () -> new StateItem(protoVer));
}
/** {@inheritDoc} */
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java
index 06f1a8d..f910c54 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java
@@ -32,9 +32,9 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.internal.MessageReader;
-import org.apache.ignite.network.internal.MessageSerializerFactory;
import org.apache.ignite.network.internal.MessageWriter;
import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
import org.apache.ignite.network.message.MessageSerializer;
import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -205,7 +205,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
private static final Object NULL = new Object();
/** */
- private final MessageSerializerFactory msgSerFactory;
+ private final MessageSerializationRegistry serializationRegistry;
/** */
private ByteBuffer buf;
@@ -298,10 +298,10 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
private byte[] curStrBackingArr;
/**
- * @param msgSerFactory Message factory.
+ * @param serializationRegistry Message mappers.
*/
- public DirectByteBufferStreamImplV1(MessageSerializerFactory msgSerFactory) {
- this.msgSerFactory = msgSerFactory;
+ public DirectByteBufferStreamImplV1(MessageSerializationRegistry serializationRegistry) {
+ this.serializationRegistry = serializationRegistry;
}
/** {@inheritDoc} */
@@ -678,7 +678,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
writer.setCurrentWriteClass(msg.getClass());
- MessageSerializer<NetworkMessage> serializer = msgSerFactory.createSerializer(msg.directType());
+ MessageSerializer<NetworkMessage> serializer = serializationRegistry.createSerializer(msg.directType());
writer.setBuffer(buf);
@@ -1181,7 +1181,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
short type = readShort();
- msgDeserializer = type == Short.MIN_VALUE ? null : msgSerFactory.createDeserializer(type);
+ msgDeserializer = type == Short.MIN_VALUE ? null : serializationRegistry.createDeserializer(type);
msgTypeDone = true;
}
@@ -1802,4 +1802,3 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
public T create(int len);
}
}
-
diff --git a/modules/network/src/main/java/org/apache/ignite/network/message/MessageSerializerProvider.java b/modules/network/src/main/java/org/apache/ignite/network/message/MessageSerializationFactory.java
similarity index 75%
rename from modules/network/src/main/java/org/apache/ignite/network/message/MessageSerializerProvider.java
rename to modules/network/src/main/java/org/apache/ignite/network/message/MessageSerializationFactory.java
index b6dab48..2720dc2 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/message/MessageSerializerProvider.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/message/MessageSerializationFactory.java
@@ -18,25 +18,21 @@
package org.apache.ignite.network.message;
/**
- * MessageMapperProvider creates {@link MessageDeserializer} and {@link MessageSerializer} instances
- * for working with {@link NetworkMessage} objects.
+ * Creates {@link MessageDeserializer} and {@link MessageSerializer} instances for working with
+ * {@link NetworkMessage} objects.
+ *
* @param <M> Message type.
*/
-public interface MessageSerializerProvider<M extends NetworkMessage> {
+public interface MessageSerializationFactory<M extends NetworkMessage> {
/**
- * Create deserializer.
+ * Creates a deserializer.
* @return Message deserializer.
*/
MessageDeserializer<M> createDeserializer();
/**
- * Create serializer.
+ * Creates a serializer.
* @return Message serializer.
*/
MessageSerializer<M> createSerializer();
-
- /**
- * @return Message's field count.
- */
- byte fieldsCount();
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/message/MessageSerializationRegistry.java b/modules/network/src/main/java/org/apache/ignite/network/message/MessageSerializationRegistry.java
new file mode 100644
index 0000000..0276e63
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/message/MessageSerializationRegistry.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.message;
+
+import org.apache.ignite.network.NetworkConfigurationException;
+
+/**
+ * Container that maps message types to {@link MessageSerializationFactory} instances.
+ */
+public final class MessageSerializationRegistry {
+ /** message type -> MessageSerializerProvider instance */
+ private final MessageSerializationFactory<?>[] factories = new MessageSerializationFactory<?>[Short.MAX_VALUE << 1];
+
+ /**
+ * Registers message serialization factory by message type.
+ */
+ public MessageSerializationRegistry registerFactory(
+ short type, MessageSerializationFactory<?> factory
+ ) throws NetworkConfigurationException {
+ if (this.factories[type] != null)
+ throw new NetworkConfigurationException("Message mapper for type " + type + " is already defined");
+
+ this.factories[type] = factory;
+
+ return this;
+ }
+
+ /**
+ * Returns a {@link MessageSerializationFactory} for the given message type.
+ */
+ private <T extends NetworkMessage> MessageSerializationFactory<T> getFactory(short type) {
+ var provider = factories[type];
+
+ assert provider != null : "No serializer provider defined for type " + type;
+
+ return (MessageSerializationFactory<T>) provider;
+ }
+
+ /**
+ * Creates a {@link MessageSerializer} for the given message type.
+ * <p>
+ * {@link MessageSerializationRegistry} does not track the correspondence between the message type and its Java
+ * representation, so the actual generic specialization of the returned provider relies on the caller of this
+ * method.
+ */
+ public <T extends NetworkMessage> MessageSerializer<T> createSerializer(short type) {
+ MessageSerializationFactory<T> factory = getFactory(type);
+ return factory.createSerializer();
+ }
+
+ /**
+ * Creates a {@link MessageDeserializer} for the given message type.
+ * <p>
+ * {@link MessageSerializationRegistry} does not track the correspondence between the message type and its Java
+ * representation, so the actual generic specialization of the returned provider relies on the caller of this
+ * method.
+ */
+ public <T extends NetworkMessage> MessageDeserializer<T> createDeserializer(short type) {
+ MessageSerializationFactory<T> factory = getFactory(type);
+ return factory.createDeserializer();
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
new file mode 100644
index 0000000..6b473a0
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import io.scalecube.cluster.ClusterImpl;
+import io.scalecube.cluster.ClusterMessageHandler;
+import io.scalecube.cluster.membership.MembershipEvent;
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.net.Address;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkConfigurationException;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.AbstractClusterService;
+
+/**
+ * {@link ClusterServiceFactory} implementation that uses ScaleCube for messaging and topology services.
+ */
+public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
+ /** {@inheritDoc} */
+ @Override public ClusterService createClusterService(ClusterLocalConfiguration context) {
+ var topologyService = new ScaleCubeTopologyService();
+ var messagingService = new ScaleCubeMessagingService(topologyService);
+
+ var cluster = new ClusterImpl()
+ .handler(cl -> new ClusterMessageHandler() {
+ @Override public void onMessage(Message message) {
+ messagingService.fireEvent(message);
+ }
+
+ @Override public void onMembershipEvent(MembershipEvent event) {
+ topologyService.fireEvent(event);
+ }
+ })
+ .config(opts -> opts.memberAlias(context.getName()))
+ .transport(opts -> opts.port(context.getPort()))
+ .membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())));
+
+ // resolve cyclic dependencies
+ topologyService.setCluster(cluster);
+ messagingService.setCluster(cluster);
+
+ return new AbstractClusterService(context, topologyService, messagingService) {
+ @Override public void start() {
+ cluster.startAwait();
+ }
+
+ @Override public void shutdown() {
+ cluster.shutdown();
+ cluster.onShutdown().block();
+ }
+ };
+ }
+
+ /** */
+ private List<Address> parseAddresses(List<String> addresses) {
+ try {
+ return addresses.stream()
+ .map(Address::from)
+ .collect(Collectors.toList());
+ } catch (IllegalArgumentException e) {
+ throw new NetworkConfigurationException("Failed to parse address", e);
+ }
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMemberResolver.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMemberResolver.java
deleted file mode 100644
index 361e2f8..0000000
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMemberResolver.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.network.scalecube;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import io.scalecube.cluster.Member;
-import org.apache.ignite.network.NetworkMember;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Resolver for scalecube specific member.
- */
-public class ScaleCubeMemberResolver {
- /** Map of public network member by its unique name. */
- private final Map<String, NetworkMember> directMemberMap = new ConcurrentHashMap<>();
-
- /** Map of scalecube member by its public member. */
- private final Map<NetworkMember, Member> reverseMemberMap = new ConcurrentHashMap<>();
-
- /**
- * Getting the existed member by scalecube member or create new one.
- *
- * @param member ScaleCube specific member.
- * @return Public network member instance.
- */
- public NetworkMember resolveNetworkMember(Member member) {
- String alias = member.alias();
-
- NetworkMember networkMember = directMemberMap.get(alias);
-
- if (networkMember != null)
- return networkMember;
-
- networkMember = directMemberMap.computeIfAbsent(alias, NetworkMember::new);
-
- reverseMemberMap.put(networkMember, member);
-
- return networkMember;
- }
-
- /**
- * @param member Public network member.
- * @return ScaleCube specific member.
- */
- public Member resolveMember(NetworkMember member) {
- return requireNonNull(reverseMemberMap.get(member));
- }
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java
deleted file mode 100644
index 2e66249..0000000
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.network.scalecube;
-
-import io.scalecube.cluster.transport.api.Message;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import io.scalecube.cluster.Cluster;
-import io.scalecube.cluster.ClusterMessageHandler;
-import io.scalecube.cluster.membership.MembershipEvent;
-import io.scalecube.net.Address;
-import org.apache.ignite.network.NetworkClusterEventHandler;
-import org.apache.ignite.network.NetworkMember;
-import org.apache.ignite.network.message.NetworkMessage;
-import org.apache.ignite.network.NetworkMessageHandler;
-import org.apache.ignite.network.MessageHandlerHolder;
-
-/**
- * Integration class for adapting {@link NetworkMessageHandler} and {@link NetworkClusterEventHandler} in terms of
- * ScaleCube.
- */
-public class ScaleCubeMessageHandler implements ClusterMessageHandler {
- /** Instance of scalecube cluster. */
- private final Cluster cluster;
-
- /** Resolver from/to inner member to/from public one. */
- private final ScaleCubeMemberResolver scaleCubeMemberResolver;
-
- /** Storage of all handlers for execution. */
- private final MessageHandlerHolder messageHandlerHolder;
-
- /** Utility map for recognizing member for its address(scalecube doesn't provide such information in input message). */
- private final Map<Address, NetworkMember> addressMemberMap = new ConcurrentHashMap<>();
-
- /**
- * @param cluster Instance of scalecube cluster.
- * @param resolver Resolver from/to inner member to/from public one.
- * @param holder Storage of all handlers for execution.
- */
- public ScaleCubeMessageHandler(
- Cluster cluster,
- ScaleCubeMemberResolver resolver,
- MessageHandlerHolder holder
- ) {
- this.cluster = cluster;
- scaleCubeMemberResolver = resolver;
- messageHandlerHolder = holder;
- }
-
- /** {@inheritDoc} */
- @Override public void onMessage(Message message) {
- for (NetworkMessageHandler handler : messageHandlerHolder.messageHandlers()) {
- NetworkMessage msg = message.data();
- NetworkMember sender = memberForAddress(message.sender());
- handler.onReceived(msg, sender, message.correlationId());
- }
- }
-
- /**
- * @param address Inet address.
- * @return Network member corresponded to input address.
- */
- private NetworkMember memberForAddress(Address address) {
- return addressMemberMap.computeIfAbsent(address,
- (key) -> cluster
- .members().stream()
- .filter(mem -> mem.address().equals(address))
- .map(scaleCubeMemberResolver::resolveNetworkMember)
- .findFirst()
- .orElse(null)
- );
- }
-
- /** {@inheritDoc} */
- @Override public void onMembershipEvent(MembershipEvent event) {
- for (NetworkClusterEventHandler lsnr : messageHandlerHolder.clusterEventHandlers()) {
- if (event.type() == MembershipEvent.Type.ADDED)
- lsnr.onAppeared(scaleCubeMemberResolver.resolveNetworkMember(event.member()));
- else if (event.type() == MembershipEvent.Type.LEAVING || event.type() == MembershipEvent.Type.REMOVED)
- lsnr.onDisappeared((scaleCubeMemberResolver.resolveNetworkMember(event.member())));
- else if (event.type() == MembershipEvent.Type.UPDATED) {
- //do nothing.
- }
- else
- throw new RuntimeException("This event is not supported: event = " + event);
- }
- }
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
new file mode 100644
index 0000000..658de70
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import io.scalecube.cluster.Cluster;
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.net.Address;
+import org.apache.ignite.network.AbstractMessagingService;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.network.TopologyEventHandler;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Implementation of {@link MessagingService} based on ScaleCube.
+ */
+final class ScaleCubeMessagingService extends AbstractMessagingService {
+ /**
+ * Inner representation of a ScaleCube cluster.
+ */
+ private Cluster cluster;
+
+ /**
+ * Utility map for recognizing cluster members by their addresses.
+ */
+ private final Map<Address, ClusterNode> addressMemberMap = new ConcurrentHashMap<>();
+
+ /** */
+ ScaleCubeMessagingService(TopologyService topologyService) {
+ topologyService.addEventHandler(new TopologyEventHandler() {
+ @Override public void onAppeared(ClusterNode member) {
+ addressMemberMap.put(clusterNodeAddress(member), member);
+ }
+
+ @Override public void onDisappeared(ClusterNode member) {
+ addressMemberMap.remove(clusterNodeAddress(member));
+ }
+ });
+ }
+
+ /**
+ * Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency injection.
+ */
+ void setCluster(Cluster cluster) {
+ this.cluster = cluster;
+ }
+
+ /**
+ * Delegates the received message to the registered message handlers.
+ */
+ void fireEvent(Message message) {
+ NetworkMessage msg = message.data();
+ ClusterNode sender = addressMemberMap.get(message.sender());
+ String correlationId = message.correlationId();
+ for (NetworkMessageHandler handler : getMessageHandlers())
+ handler.onReceived(msg, sender, correlationId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void weakSend(ClusterNode recipient, NetworkMessage msg) {
+ cluster
+ .send(clusterNodeAddress(recipient), Message.fromData(msg))
+ .subscribe();
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) {
+ return cluster
+ .send(clusterNodeAddress(recipient), Message.fromData(msg))
+ .toFuture();
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg, String correlationId) {
+ var message = Message
+ .withData(msg)
+ .correlationId(correlationId)
+ .build();
+ return cluster
+ .send(clusterNodeAddress(recipient), message)
+ .toFuture();
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout) {
+ var message = Message
+ .withData(msg)
+ .correlationId(UUID.randomUUID().toString())
+ .build();
+ return cluster
+ .requestResponse(clusterNodeAddress(recipient), message)
+ .timeout(Duration.ofMillis(timeout))
+ .toFuture()
+ .thenApply(Message::data);
+ }
+
+ /**
+ * Extracts the given node's {@link Address}.
+ */
+ private static Address clusterNodeAddress(ClusterNode node) {
+ return Address.create(node.host(), node.port());
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
deleted file mode 100644
index 23ae537..0000000
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.network.scalecube;
-
-import io.scalecube.cluster.Cluster;
-import io.scalecube.cluster.transport.api.Message;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-import org.apache.ignite.network.MessageHandlerHolder;
-import org.apache.ignite.network.NetworkCluster;
-import org.apache.ignite.network.NetworkClusterEventHandler;
-import org.apache.ignite.network.NetworkHandlersProvider;
-import org.apache.ignite.network.NetworkMember;
-import org.apache.ignite.network.NetworkMessageHandler;
-import org.apache.ignite.network.message.NetworkMessage;
-
-import static io.scalecube.cluster.transport.api.Message.fromData;
-import static java.time.Duration.ofMillis;
-
-/**
- * Implementation of {@link NetworkCluster} based on ScaleCube.
- */
-public class ScaleCubeNetworkCluster implements NetworkCluster {
- /** Inner representation of cluster of scalecube. */
- private final Cluster cluster;
-
- /** Resolver for scalecube specific member. */
- private final ScaleCubeMemberResolver memberResolver;
-
- /** Holder of all cluster handlers. */
- private final MessageHandlerHolder messageHandlerHolder;
-
- /**
- * @param cluster Inner representation of cluster of scalecube.
- * @param memberResolver Resolver for scalecube specific member.
- * @param messageHandlerHolder Holder of all cluster handlers.
- */
- public ScaleCubeNetworkCluster(
- Cluster cluster,
- ScaleCubeMemberResolver memberResolver,
- MessageHandlerHolder messageHandlerHolder
- ) {
- this.messageHandlerHolder = messageHandlerHolder;
- this.cluster = cluster;
- this.memberResolver = memberResolver;
- }
-
- /** {@inheritDoc} */
- @Override public void shutdown() throws Exception {
- cluster.shutdown();
-
- cluster.onShutdown().block();
- }
-
- /** {@inheritDoc} */
- @Override public NetworkMember localMember() {
- return memberResolver.resolveNetworkMember(cluster.member());
- }
-
- /** {@inheritDoc} */
- @Override public Collection<NetworkMember> allMembers() {
- return cluster.members().stream()
- .map(memberResolver::resolveNetworkMember)
- .collect(Collectors.toList());
- }
-
- /** {@inheritDoc} */
- @Override public void weakSend(NetworkMember member, NetworkMessage msg) {
- cluster.send(memberResolver.resolveMember(member), fromNetworkMessage(msg))
- .subscribe();
- }
-
- /** {@inheritDoc} */
- @Override public Future<Void> send(NetworkMember member, NetworkMessage msg) {
- return cluster.send(memberResolver.resolveMember(member), fromData(msg)).toFuture();
- }
-
- @Override public Future<?> send(NetworkMember member, NetworkMessage msg, String corellationId) {
- return cluster.send(memberResolver.resolveMember(member),
- Message.withData(msg).correlationId(corellationId).build()
- ).toFuture();
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<NetworkMessage> invoke(NetworkMember member, NetworkMessage msg, long timeout) {
- return cluster.requestResponse(memberResolver.resolveMember(member),
- Message.withData(msg).correlationId(UUID.randomUUID().toString()).build())
- .timeout(ofMillis(timeout)).toFuture().thenApply(m -> m.data());
- }
-
- /** {@inheritDoc} */
- @Override public void addHandlersProvider(NetworkHandlersProvider networkHandlersProvider) {
- NetworkClusterEventHandler lsnr = networkHandlersProvider.clusterEventHandler();
-
- if (lsnr != null)
- messageHandlerHolder.addClusterEventHandlers(lsnr);
-
- NetworkMessageHandler messageHandler = networkHandlersProvider.messageHandler();
-
- if (messageHandler != null)
- messageHandlerHolder.addmessageHandlers(messageHandler);
- }
-
- /**
- * Create ScaleCube {@link Message} from {@link NetworkMessage}.
- * @param message Network message.
- * @return ScaleCube {@link Message}.
- */
- private Message fromNetworkMessage(NetworkMessage message) {
- return Message.builder()
- .data(message)
- .build();
- }
-
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkClusterFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkClusterFactory.java
deleted file mode 100644
index 87fdeff..0000000
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkClusterFactory.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.network.scalecube;
-
-import io.scalecube.cluster.Cluster;
-import io.scalecube.cluster.ClusterImpl;
-import io.scalecube.net.Address;
-import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.ignite.network.MessageHandlerHolder;
-import org.apache.ignite.network.NetworkCluster;
-import org.apache.ignite.network.NetworkClusterContext;
-import org.apache.ignite.network.NetworkClusterFactory;
-import org.apache.ignite.network.NetworkConfigurationException;
-
-/**
- * Factory for ScaleCubeNetworkCluster.
- */
-public class ScaleCubeNetworkClusterFactory implements NetworkClusterFactory {
- /** Unique name of network member. */
- private final String localMemberName;
-
- /** Local port. */
- private final int localPort;
-
- /** Network addresses to find another members in cluster. */
- private final List<Address> addresses;
-
- /**
- * Member resolver which allows convert {@link org.apache.ignite.network.NetworkMember} to inner ScaleCube type
- * and otherwise.
- */
- private final ScaleCubeMemberResolver memberResolver;
-
- /**
- * @param localMemberName Unique name of network member.
- * @param port Local port.
- * @param addresses Network addresses to find another members in cluster.
- */
- public ScaleCubeNetworkClusterFactory(
- String localMemberName,
- int port,
- List<String> addresses,
- ScaleCubeMemberResolver memberResolver
- ) {
- this.localMemberName = localMemberName;
- this.localPort = port;
- this.addresses = addresses.stream().map(address -> {
- try {
- return Address.from(address);
- }
- catch (IllegalStateException e) {
- throw new NetworkConfigurationException("Failed to parse address", e);
- }
- }).collect(Collectors.toList());
- this.memberResolver = memberResolver;
- }
-
- /**
- * Start ScaleCube network cluster.
- *
- * @param memberResolver Member resolve which allows convert {@link org.apache.ignite.network.NetworkMember} to
- * inner ScaleCube type and otherwise.
- * @param messageHandlerHolder Holder of all cluster message handlers.
- * @return {@link NetworkCluster} instance.
- */
- @Override public NetworkCluster startCluster(NetworkClusterContext clusterContext) {
- MessageHandlerHolder handlerHolder = clusterContext.messageHandlerHolder();
-
- Cluster cluster = new ClusterImpl()
- .handler(cl -> {
- return new ScaleCubeMessageHandler(cl, memberResolver, handlerHolder);
- })
- .config(opts -> opts
- .memberAlias(localMemberName)
- .transport(trans -> {
- return trans.port(localPort);
- })
- )
- .membership(opts -> {
- return opts.seedMembers(addresses);
- })
- .startAwait();
-
- return new ScaleCubeNetworkCluster(cluster, memberResolver, handlerHolder);
- }
-
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
new file mode 100644
index 0000000..da69281
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network.scalecube;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+import io.scalecube.cluster.Cluster;
+import io.scalecube.cluster.Member;
+import io.scalecube.cluster.membership.MembershipEvent;
+import org.apache.ignite.network.AbstractTopologyService;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.TopologyEventHandler;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * Implementation of {@link TopologyService} based on ScaleCube.
+ */
+final class ScaleCubeTopologyService extends AbstractTopologyService {
+ /** Inner representation a ScaleCube cluster. */
+ private Cluster cluster;
+
+ /**
+ * Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency injection.
+ */
+ void setCluster(Cluster cluster) {
+ this.cluster = cluster;
+ }
+
+ /**
+ * Delegates the received topology event to the registered event handlers.
+ */
+ void fireEvent(MembershipEvent event) {
+ ClusterNode member = fromMember(event.member());
+ for (TopologyEventHandler handler : getEventHandlers()) {
+ switch (event.type()) {
+ case ADDED:
+ handler.onAppeared(member);
+ break;
+ case LEAVING:
+ case REMOVED:
+ handler.onDisappeared(member);
+ break;
+ case UPDATED:
+ // do nothing
+ break;
+ default:
+ throw new RuntimeException("This event is not supported: event = " + event);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode localMember() {
+ return fromMember(cluster.member());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> allMembers() {
+ return cluster.members().stream()
+ .map(ScaleCubeTopologyService::fromMember)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Converts the given {@link Member} to a {@link ClusterNode}.
+ */
+ private static ClusterNode fromMember(Member member) {
+ return new ClusterNode(member.alias(), member.address().host(), member.address().port());
+ }
+}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java
index 33ecc25..6bb66f2 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/Peer.java
@@ -18,7 +18,7 @@
package org.apache.ignite.raft.client;
import java.io.Serializable;
-import org.apache.ignite.network.NetworkMember;
+import org.apache.ignite.network.ClusterNode;
/**
* A participant of a replication group.
@@ -27,7 +27,7 @@ public final class Peer implements Serializable {
/**
* Network node.
*/
- private final NetworkMember node;
+ private final ClusterNode node;
/**
* Peer's local priority value, if node don't support priority election,
@@ -46,7 +46,7 @@ public final class Peer implements Serializable {
/**
* @param node Node.
*/
- public Peer(NetworkMember node) {
+ public Peer(ClusterNode node) {
this(node, ElectionPriority.DISABLED);
}
@@ -54,7 +54,7 @@ public final class Peer implements Serializable {
* @param node Node.
* @param priority Election priority.
*/
- public Peer(NetworkMember node, int priority) {
+ public Peer(ClusterNode node, int priority) {
this.node = node;
this.priority = priority;
}
@@ -62,7 +62,7 @@ public final class Peer implements Serializable {
/**
* @return Node.
*/
- public NetworkMember getNode() {
+ public ClusterNode getNode() {
return this.node;
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
index 347dc39..d85ff3b 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
@@ -25,8 +25,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.network.NetworkCluster;
-import org.apache.ignite.network.NetworkMember;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.Peer;
@@ -41,12 +41,12 @@ import org.apache.ignite.raft.client.message.GetLeaderRequest;
import org.apache.ignite.raft.client.message.GetLeaderResponse;
import org.apache.ignite.raft.client.message.GetPeersRequest;
import org.apache.ignite.raft.client.message.GetPeersResponse;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
import org.apache.ignite.raft.client.message.RaftErrorResponse;
import org.apache.ignite.raft.client.message.RemoveLearnersRequest;
import org.apache.ignite.raft.client.message.RemovePeersRequest;
import org.apache.ignite.raft.client.message.SnapshotRequest;
import org.apache.ignite.raft.client.message.TransferLeadershipRequest;
-import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.jetbrains.annotations.NotNull;
@@ -80,7 +80,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
private volatile List<Peer> learners;
/** */
- private final NetworkCluster cluster;
+ private final ClusterService cluster;
/** */
private final long retryDelay;
@@ -100,7 +100,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
*/
public RaftGroupServiceImpl(
String groupId,
- NetworkCluster cluster,
+ ClusterService cluster,
RaftClientMessageFactory factory,
int timeout,
List<Peer> peers,
@@ -262,7 +262,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
@Override public CompletableFuture<Void> snapshot(Peer peer) {
SnapshotRequest req = factory.snapshotRequest().groupId(groupId).build();
- CompletableFuture<?> fut = cluster.invoke(peer.getNode(), req, timeout);
+ CompletableFuture<?> fut = cluster.messagingService().invoke(peer.getNode(), req, timeout);
return fut.thenApply(resp -> null);
}
@@ -276,7 +276,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
TransferLeadershipRequest req = factory.transferLeaderRequest().groupId(groupId).peer(newLeader).build();
- CompletableFuture<?> fut = cluster.invoke(newLeader.getNode(), req, timeout);
+ CompletableFuture<?> fut = cluster.messagingService().invoke(newLeader.getNode(), req, timeout);
return fut.thenApply(resp -> null);
}
@@ -299,15 +299,15 @@ public class RaftGroupServiceImpl implements RaftGroupService {
@Override public <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd) {
ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).build();
- CompletableFuture fut = cluster.invoke(peer.getNode(), req, timeout);
+ CompletableFuture<?> fut = cluster.messagingService().invoke(peer.getNode(), req, timeout);
- return fut.thenApply(resp -> ((ActionResponse) resp).result());
+ return fut.thenApply(resp -> ((ActionResponse<R>) resp).result());
}
- private <R> CompletableFuture<R> sendWithRetry(NetworkMember node, NetworkMessage req, long stopTime) {
+ private <R> CompletableFuture<R> sendWithRetry(ClusterNode node, NetworkMessage req, long stopTime) {
if (currentTimeMillis() >= stopTime)
return CompletableFuture.failedFuture(new TimeoutException());
- return cluster.invoke(node, req, timeout)
+ return cluster.messagingService().invoke(node, req, timeout)
.thenCompose(resp -> {
if (resp instanceof RaftErrorResponse) {
RaftErrorResponse resp0 = (RaftErrorResponse)resp;
@@ -343,7 +343,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
/**
* @return Random node.
*/
- private NetworkMember randomNode() {
+ private ClusterNode randomNode() {
List<Peer> peers0 = peers;
if (peers0 == null || peers0.isEmpty())
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
index 928b45d..cb93874 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
@@ -23,8 +23,9 @@ import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.network.NetworkCluster;
-import org.apache.ignite.network.NetworkMember;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.RaftErrorCode;
import org.apache.ignite.raft.client.WriteCommand;
@@ -40,9 +41,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.stubbing.Answer;
import static java.util.List.of;
import static java.util.concurrent.CompletableFuture.completedFuture;
@@ -56,6 +55,7 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.when;
/**
* Test methods of raft group service.
@@ -66,8 +66,11 @@ public class RaftGroupServiceTest {
private static final IgniteLogger LOG = IgniteLogger.forClass(RaftGroupServiceTest.class);
/** */
- private static final List<Peer> NODES = of(new Peer(new NetworkMember("node1")), new Peer(new NetworkMember("node2")),
- new Peer(new NetworkMember("node3")));
+ private static final List<Peer> NODES = of(
+ new Peer(new ClusterNode("node1", "foobar", 123)),
+ new Peer(new ClusterNode("node2", "foobar", 123)),
+ new Peer(new ClusterNode("node3", "foobar", 123))
+ );
/** */
private static final RaftClientMessageFactory FACTORY = new RaftClientMessageFactoryImpl();
@@ -81,15 +84,21 @@ public class RaftGroupServiceTest {
/** Retry delay. */
private static final int DELAY = 200;
- /** Cluster. */
+ /** Mock cluster. */
@Mock
- private NetworkCluster cluster;
+ private ClusterService cluster;
+
+ /** Mock messaging service */
+ @Mock
+ private MessagingService messagingService;
/**
* @param testInfo Test info.
*/
@BeforeEach
void before(TestInfo testInfo) {
+ when(cluster.messagingService()).thenReturn(messagingService);
+
LOG.info(">>>> Starting test " + testInfo.getTestMethod().orElseThrow().getName());
}
@@ -100,7 +109,7 @@ public class RaftGroupServiceTest {
public void testRefreshLeaderStable() throws Exception {
String groupId = "test";
- mockLeaderRequest(cluster, false);
+ mockLeaderRequest(false);
RaftGroupService service =
new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
@@ -119,7 +128,7 @@ public class RaftGroupServiceTest {
public void testRefreshLeaderNotElected() throws Exception {
String groupId = "test";
- mockLeaderRequest(cluster, false);
+ mockLeaderRequest(false);
// Simulate running elections.
leader = null;
@@ -146,7 +155,7 @@ public class RaftGroupServiceTest {
public void testRefreshLeaderElectedAfterDelay() throws Exception {
String groupId = "test";
- mockLeaderRequest(cluster, false);
+ mockLeaderRequest(false);
// Simulate running elections.
leader = null;
@@ -176,7 +185,7 @@ public class RaftGroupServiceTest {
public void testRefreshLeaderWithTimeout() throws Exception {
String groupId = "test";
- mockLeaderRequest(cluster, true);
+ mockLeaderRequest(true);
RaftGroupService service =
new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
@@ -198,8 +207,8 @@ public class RaftGroupServiceTest {
public void testUserRequestLeaderElected() throws Exception {
String groupId = "test";
- mockLeaderRequest(cluster, false);
- mockUserInput(cluster, false);
+ mockLeaderRequest(false);
+ mockUserInput(false);
RaftGroupService service =
new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
@@ -218,8 +227,8 @@ public class RaftGroupServiceTest {
public void testUserRequestLazyInitLeader() throws Exception {
String groupId = "test";
- mockLeaderRequest(cluster, false);
- mockUserInput(cluster, false);
+ mockLeaderRequest(false);
+ mockUserInput(false);
RaftGroupService service =
new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
@@ -240,8 +249,8 @@ public class RaftGroupServiceTest {
public void testUserRequestWithTimeout() throws Exception {
String groupId = "test";
- mockLeaderRequest(cluster, false);
- mockUserInput(cluster, true);
+ mockLeaderRequest(false);
+ mockUserInput(true);
RaftGroupService service =
new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY);
@@ -263,8 +272,8 @@ public class RaftGroupServiceTest {
public void testUserRequestLeaderNotElected() throws Exception {
String groupId = "test";
- mockLeaderRequest(cluster, false);
- mockUserInput(cluster, false);
+ mockLeaderRequest(false);
+ mockUserInput(false);
RaftGroupService service =
new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY);
@@ -294,8 +303,8 @@ public class RaftGroupServiceTest {
public void testUserRequestLeaderElectedAfterDelay() throws Exception {
String groupId = "test";
- mockLeaderRequest(cluster, false);
- mockUserInput(cluster, false);
+ mockLeaderRequest(false);
+ mockUserInput(false);
RaftGroupService service =
new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY);
@@ -330,8 +339,8 @@ public class RaftGroupServiceTest {
public void testUserRequestLeaderChanged() throws Exception {
String groupId = "test";
- mockLeaderRequest(cluster, false);
- mockUserInput(cluster, false);
+ mockLeaderRequest(false);
+ mockUserInput(false);
RaftGroupService service =
new RaftGroupServiceImpl(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY);
@@ -356,59 +365,61 @@ public class RaftGroupServiceTest {
}
/**
- * @param cluster The cluster.
* @param simulateTimeout {@code True} to simulate request timeout.
*/
- private void mockUserInput(NetworkCluster cluster, boolean simulateTimeout) {
- Mockito.doAnswer(new Answer() {
- @Override public Object answer(InvocationOnMock invocation) throws Throwable {
- NetworkMember target = invocation.getArgument(0);
-
- if (simulateTimeout)
- return failedFuture(new TimeoutException());
-
- Object resp;
-
- if (leader == null)
- resp = FACTORY.raftErrorResponse().errorCode(RaftErrorCode.NO_LEADER).build();
- else if (target != leader.getNode())
- resp = FACTORY.raftErrorResponse().errorCode(RaftErrorCode.LEADER_CHANGED).newLeader(leader).build();
- else
- resp = FACTORY.actionResponse().result(new TestResponse()).build();
-
- return completedFuture(resp);
- }
- }).when(cluster).invoke(any(), argThat(new ArgumentMatcher<ActionRequest>() {
- @Override public boolean matches(ActionRequest arg) {
- return arg.command() instanceof TestCommand;
- }
- }), anyLong());
+ private void mockUserInput(boolean simulateTimeout) {
+ Mockito.doAnswer(invocation -> {
+ ClusterNode target = invocation.getArgument(0);
+
+ if (simulateTimeout)
+ return failedFuture(new TimeoutException());
+
+ Object resp;
+
+ if (leader == null)
+ resp = FACTORY.raftErrorResponse().errorCode(RaftErrorCode.NO_LEADER).build();
+ else if (target != leader.getNode())
+ resp = FACTORY.raftErrorResponse().errorCode(RaftErrorCode.LEADER_CHANGED).newLeader(leader).build();
+ else
+ resp = FACTORY.actionResponse().result(new TestResponse()).build();
+
+ return completedFuture(resp);
+ })
+ .when(messagingService)
+ .invoke(
+ any(),
+ argThat(new ArgumentMatcher<ActionRequest>() {
+ @Override public boolean matches(ActionRequest arg) {
+ return arg.command() instanceof TestCommand;
+ }
+ }),
+ anyLong()
+ );
}
/**
- * @param cluster The cluster.
* @param simulateTimeout {@code True} to simulate request timeout.
*/
- private void mockLeaderRequest(NetworkCluster cluster, boolean simulateTimeout) {
- Mockito.doAnswer(new Answer() {
- @Override public Object answer(InvocationOnMock invocation) throws Throwable {
- if (simulateTimeout)
- return failedFuture(new TimeoutException());
+ private void mockLeaderRequest(boolean simulateTimeout) {
+ Mockito.doAnswer(invocation -> {
+ if (simulateTimeout)
+ return failedFuture(new TimeoutException());
- Object resp;
+ Object resp;
- Peer leader0 = leader;
+ Peer leader0 = leader;
- if (leader0 == null) {
- resp = FACTORY.raftErrorResponse().errorCode(RaftErrorCode.NO_LEADER).build();
- }
- else {
- resp = FACTORY.getLeaderResponse().leader(leader0).build();
- }
-
- return completedFuture(resp);
+ if (leader0 == null) {
+ resp = FACTORY.raftErrorResponse().errorCode(RaftErrorCode.NO_LEADER).build();
+ }
+ else {
+ resp = FACTORY.getLeaderResponse().leader(leader0).build();
}
- }).when(cluster).invoke(any(), any(GetLeaderRequest.class), anyLong());
+
+ return completedFuture(resp);
+ })
+ .when(messagingService)
+ .invoke(any(), any(GetLeaderRequest.class), anyLong());
}
/** */
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
index 5a96abc..d24d6f9 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
@@ -20,10 +20,11 @@ package org.apache.ignite.raft.server;
import java.util.List;
import java.util.Map;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.network.Network;
-import org.apache.ignite.network.NetworkCluster;
-import org.apache.ignite.network.scalecube.ScaleCubeMemberResolver;
-import org.apache.ignite.network.scalecube.ScaleCubeNetworkClusterFactory;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
import org.apache.ignite.raft.client.message.impl.RaftClientMessageFactoryImpl;
@@ -47,10 +48,25 @@ class ITRaftCounterServerTest {
/** */
private static final RaftClientMessageFactory FACTORY = new RaftClientMessageFactoryImpl();
+ /** Network factory. */
+ private static final ClusterServiceFactory NETWORK_FACTORY = new ScaleCubeClusterServiceFactory();
+
+ /** */
+ // TODO: IGNITE-14088: Uncomment and use real serializer provider
+ private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
+// .registerFactory((short)1000, ???)
+// .registerFactory((short)1001, ???)
+// .registerFactory((short)1005, ???)
+// .registerFactory((short)1006, ???)
+// .registerFactory((short)1009, ???);
+
/** */
private RaftServer server;
/** */
+ private ClusterService client;
+
+ /** */
private static final String SERVER_ID = "testServer";
/** */
@@ -66,7 +82,7 @@ class ITRaftCounterServerTest {
* @param testInfo Test info.
*/
@BeforeEach
- void before(TestInfo testInfo) throws Exception {
+ void before(TestInfo testInfo) {
LOG.info(">>>> Starting test " + testInfo.getTestMethod().orElseThrow().getName());
server = new RaftServerImpl(SERVER_ID,
@@ -74,6 +90,10 @@ class ITRaftCounterServerTest {
FACTORY,
1000,
Map.of(COUNTER_GROUP_ID_0, new CounterCommandListener(), COUNTER_GROUP_ID_1, new CounterCommandListener()));
+
+ client = startClient(CLIENT_ID, 20101, List.of("localhost:20100"));
+
+ assertTrue(waitForTopology(client, 2, 1000));
}
/**
@@ -82,18 +102,14 @@ class ITRaftCounterServerTest {
@AfterEach
void after() throws Exception {
server.shutdown();
+ client.shutdown();
}
/**
- * @throws Exception
*/
@Test
- public void testRefreshLeader() throws Exception {
- NetworkCluster client = startClient(CLIENT_ID, 20101, List.of("localhost:20100"));
-
- assertTrue(waitForTopology(client, 2, 1000));
-
- Peer server = new Peer(client.allMembers().stream().filter(m -> SERVER_ID.equals(m.name())).findFirst().orElseThrow());
+ public void testRefreshLeader() {
+ Peer server = new Peer(client.topologyService().allMembers().stream().filter(m -> SERVER_ID.equals(m.name())).findFirst().orElseThrow());
RaftGroupService service = new RaftGroupServiceImpl(COUNTER_GROUP_ID_0, client, FACTORY, 1000,
List.of(server), true, 200);
@@ -102,8 +118,6 @@ class ITRaftCounterServerTest {
assertNotNull(leader);
assertEquals(server.getNode().name(), leader.getNode().name());
-
- client.shutdown();
}
/**
@@ -111,11 +125,7 @@ class ITRaftCounterServerTest {
*/
@Test
public void testCounterCommandListener() throws Exception {
- NetworkCluster client = startClient(CLIENT_ID, 20101, List.of("localhost:20100"));
-
- assertTrue(waitForTopology(client, 2, 1000));
-
- Peer server = new Peer(client.allMembers().stream().filter(m -> SERVER_ID.equals(m.name())).findFirst().orElseThrow());
+ Peer server = new Peer(client.topologyService().allMembers().stream().filter(m -> SERVER_ID.equals(m.name())).findFirst().orElseThrow());
RaftGroupService service0 = new RaftGroupServiceImpl(COUNTER_GROUP_ID_0, client, FACTORY, 1000,
List.of(server), true, 200);
@@ -135,8 +145,6 @@ class ITRaftCounterServerTest {
assertEquals(4, service1.<Integer>run(new GetValueCommand()).get());
assertEquals(7, service1.<Integer>run(new IncrementAndGetCommand(3)).get());
assertEquals(7, service1.<Integer>run(new GetValueCommand()).get());
-
- client.shutdown();
}
/**
@@ -145,19 +153,11 @@ class ITRaftCounterServerTest {
* @param servers Server nodes of the cluster.
* @return The client cluster view.
*/
- private NetworkCluster startClient(String name, int port, List<String> servers) {
- Network network = new Network(
- new ScaleCubeNetworkClusterFactory(name, port, servers, new ScaleCubeMemberResolver())
- );
-
- // TODO: IGNITE-14088: Uncomment and use real serializer provider
-// network.registerMessageMapper((short)1000, new DefaultMessageMapperProvider());
-// network.registerMessageMapper((short)1001, new DefaultMessageMapperProvider());
-// network.registerMessageMapper((short)1005, new DefaultMessageMapperProvider());
-// network.registerMessageMapper((short)1006, new DefaultMessageMapperProvider());
-// network.registerMessageMapper((short)1009, new DefaultMessageMapperProvider());
-
- return network.start();
+ private ClusterService startClient(String name, int port, List<String> servers) {
+ var context = new ClusterLocalConfiguration(name, port, servers, SERIALIZATION_REGISTRY);
+ var network = NETWORK_FACTORY.createClusterService(context);
+ network.start();
+ return network;
}
/**
@@ -166,11 +166,11 @@ class ITRaftCounterServerTest {
* @param timeout The timeout in millis.
* @return {@code True} if topology size is equal to expected.
*/
- private boolean waitForTopology(NetworkCluster cluster, int expected, int timeout) {
+ private boolean waitForTopology(ClusterService cluster, int expected, int timeout) {
long stop = System.currentTimeMillis() + timeout;
while(System.currentTimeMillis() < stop) {
- if (cluster.allMembers().size() >= expected)
+ if (cluster.topologyService().allMembers().size() >= expected)
return true;
try {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/server/RaftServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/server/RaftServer.java
index d6bcefd..f646780 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/server/RaftServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/server/RaftServer.java
@@ -17,7 +17,7 @@
package org.apache.ignite.raft.server;
-import org.apache.ignite.network.NetworkMember;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.raft.client.service.RaftGroupCommandListener;
/**
@@ -32,7 +32,7 @@ public interface RaftServer {
/**
* @return Local member.
*/
- NetworkMember localMember();
+ ClusterNode localMember();
/**
* Set a listener for group commands.
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java
index 31c0f2d..943e539 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java
@@ -27,14 +27,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.network.Network;
-import org.apache.ignite.network.NetworkCluster;
-import org.apache.ignite.network.NetworkHandlersProvider;
-import org.apache.ignite.network.NetworkMember;
-import org.apache.ignite.network.NetworkMessageHandler;
-import org.apache.ignite.network.message.NetworkMessage;
-import org.apache.ignite.network.scalecube.ScaleCubeMemberResolver;
-import org.apache.ignite.network.scalecube.ScaleCubeNetworkClusterFactory;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.RaftErrorCode;
@@ -68,7 +65,7 @@ public class RaftServerImpl implements RaftServer {
private final RaftClientMessageFactory clientMsgFactory;
/** */
- private final NetworkCluster server;
+ private final ClusterService server;
/** */
private final ConcurrentMap<String, RaftGroupCommandListener> listeners = new ConcurrentHashMap<>();
@@ -109,63 +106,58 @@ public class RaftServerImpl implements RaftServer {
if (listeners != null)
this.listeners.putAll(listeners);
- readQueue = new ArrayBlockingQueue<CommandClosureEx<ReadCommand>>(queueSize);
- writeQueue = new ArrayBlockingQueue<CommandClosureEx<WriteCommand>>(queueSize);
-
- Network network = new Network(
- new ScaleCubeNetworkClusterFactory(id, localPort, List.of(), new ScaleCubeMemberResolver())
- );
-
- // TODO: IGNITE-14088: Uncomment and use real serializer provider
-// network.registerMessageMapper((short)1000, new DefaultMessageMapperProvider());
-// network.registerMessageMapper((short)1001, new DefaultMessageMapperProvider());
-// network.registerMessageMapper((short)1005, new DefaultMessageMapperProvider());
-// network.registerMessageMapper((short)1006, new DefaultMessageMapperProvider());
-// network.registerMessageMapper((short)1009, new DefaultMessageMapperProvider());
-
- server = network.start();
-
- server.addHandlersProvider(new NetworkHandlersProvider() {
- @Override public NetworkMessageHandler messageHandler() {
- return new NetworkMessageHandler() {
- @Override public void onReceived(NetworkMessage req, NetworkMember sender, String corellationId) {
-
- if (req instanceof GetLeaderRequest) {
- GetLeaderResponse resp = clientMsgFactory.getLeaderResponse().leader(new Peer(server.localMember())).build();
-
- server.send(sender, resp, corellationId);
- }
- else if (req instanceof ActionRequest) {
- ActionRequest req0 = (ActionRequest) req;
-
- RaftGroupCommandListener lsnr = listeners.get(req0.groupId());
-
- if (lsnr == null) {
- sendError(sender, corellationId, RaftErrorCode.ILLEGAL_STATE);
-
- return;
- }
-
- if (req0.command() instanceof ReadCommand) {
- handleActionRequest(sender, req0, corellationId, readQueue, lsnr);
- }
- else {
- handleActionRequest(sender, req0, corellationId, writeQueue, lsnr);
- }
- }
- else {
- LOG.warn("Unsupported message class " + req.getClass().getName());
- }
- }
- };
+ readQueue = new ArrayBlockingQueue<>(queueSize);
+ writeQueue = new ArrayBlockingQueue<>(queueSize);
+
+ // TODO: IGNITE-14088: Uncomment and use real serializer factory
+ var serializationRegistry = new MessageSerializationRegistry();
+// .registerFactory((short)1000, ???)
+// .registerFactory((short)1001, ???)
+// .registerFactory((short)1005, ???)
+// .registerFactory((short)1006, ???)
+// .registerFactory((short)1009, ???);
+
+ var context = new ClusterLocalConfiguration(id, localPort, List.of(), serializationRegistry);
+ var factory = new ScaleCubeClusterServiceFactory();
+
+ server = factory.createClusterService(context);
+
+ server.messagingService().addMessageHandler((message, sender, correlationId) -> {
+ if (message instanceof GetLeaderRequest) {
+ GetLeaderResponse resp = clientMsgFactory.getLeaderResponse().leader(new Peer(server.topologyService().localMember())).build();
+
+ server.messagingService().send(sender, resp, correlationId);
+ }
+ else if (message instanceof ActionRequest) {
+ ActionRequest<?> req0 = (ActionRequest<?>) message;
+
+ RaftGroupCommandListener lsnr = listeners.get(req0.groupId());
+
+ if (lsnr == null) {
+ sendError(sender, correlationId, RaftErrorCode.ILLEGAL_STATE);
+
+ return;
+ }
+
+ if (req0.command() instanceof ReadCommand) {
+ handleActionRequest(sender, req0, correlationId, readQueue, lsnr);
+ }
+ else {
+ handleActionRequest(sender, req0, correlationId, writeQueue, lsnr);
+ }
+ }
+ else {
+ LOG.warn("Unsupported message class " + message.getClass().getName());
}
});
- readWorker = new Thread(() -> processQueue(readQueue, (l, i) -> l.onRead(i)), "read-cmd-worker#" + id);
+ server.start();
+
+ readWorker = new Thread(() -> processQueue(readQueue, RaftGroupCommandListener::onRead), "read-cmd-worker#" + id);
readWorker.setDaemon(true);
readWorker.start();
- writeWorker = new Thread(() -> processQueue(writeQueue, (l, i) -> l.onWrite(i)), "write-cmd-worker#" + id);
+ writeWorker = new Thread(() -> processQueue(writeQueue, RaftGroupCommandListener::onWrite), "write-cmd-worker#" + id);
writeWorker.setDaemon(true);
writeWorker.start();
@@ -173,8 +165,8 @@ public class RaftServerImpl implements RaftServer {
}
/** {@inheritDoc} */
- @Override public NetworkMember localMember() {
- return server.localMember();
+ @Override public ClusterNode localMember() {
+ return server.topologyService().localMember();
}
/** {@inheritDoc} */
@@ -201,23 +193,24 @@ public class RaftServerImpl implements RaftServer {
}
private <T extends Command> void handleActionRequest(
- NetworkMember sender,
- ActionRequest req,
+ ClusterNode sender,
+ ActionRequest<?> req,
String corellationId,
BlockingQueue<CommandClosureEx<T>> queue,
RaftGroupCommandListener lsnr
) {
- if (!queue.offer(new CommandClosureEx<T>() {
+ if (!queue.offer(new CommandClosureEx<>() {
@Override public RaftGroupCommandListener listener() {
return lsnr;
}
@Override public T command() {
- return (T) req.command();
+ return (T)req.command();
}
@Override public void success(Object res) {
- server.send(sender, clientMsgFactory.actionResponse().result(res).build(), corellationId);
+ var msg = clientMsgFactory.actionResponse().result(res).build();
+ server.messagingService().send(sender, msg, corellationId);
}
@Override public void failure(Throwable t) {
@@ -226,8 +219,6 @@ public class RaftServerImpl implements RaftServer {
})) {
// Queue out of capacity.
sendError(sender, corellationId, RaftErrorCode.BUSY);
-
- return;
}
}
@@ -252,10 +243,10 @@ public class RaftServerImpl implements RaftServer {
}
}
- private void sendError(NetworkMember sender, String corellationId, RaftErrorCode errorCode) {
+ private void sendError(ClusterNode sender, String corellationId, RaftErrorCode errorCode) {
RaftErrorResponse resp = clientMsgFactory.raftErrorResponse().errorCode(errorCode).build();
- server.send(sender, resp, corellationId);
+ server.messagingService().send(sender, resp, corellationId);
}
private interface CommandClosureEx<T extends Command> extends CommandClosure<T> {