You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/07/20 10:47:30 UTC
[ignite-3] branch main updated: IGNITE-14983 Enable registering
message listeners for message groups (#229)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3adc3d7 IGNITE-14983 Enable registering message listeners for message groups (#229)
3adc3d7 is described below
commit 3adc3d7f167d2e7a2548211867e5d498d5cec7e3
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Tue Jul 20 13:47:21 2021 +0300
IGNITE-14983 Enable registering message listeners for message groups (#229)
---
.../matchers}/CompletableFutureMatcher.java | 9 +-
.../internal/metastorage/MetaStorageManager.java | 2 +-
.../ignite/network/AbstractMessagingService.java | 48 +++++++++--
.../apache/ignite/network/MessagingService.java | 8 +-
modules/network/pom.xml | 7 ++
.../scalecube/ITScaleCubeNetworkMessagingTest.java | 95 +++++++++++++++++++++-
.../scalecube/ScaleCubeMessagingService.java | 2 +-
.../MessageSerializationRegistryImplTest.java | 28 ++++---
.../apache/ignite/network/TestMessageTypes.java | 3 +
.../internal/raft/server/impl/RaftServerImpl.java | 44 +++++-----
.../apache/ignite/raft/jraft/rpc/RpcServer.java | 5 +-
.../raft/jraft/rpc/impl/IgniteRpcServer.java | 54 ++++++++----
.../ignite/raft/jraft/rpc/TestIgniteRpcServer.java | 4 +
.../PersistencePropertiesVaultServiceTest.java | 2 +-
.../ignite/internal/vault/VaultManagerTest.java | 2 +-
.../ignite/internal/vault/VaultServiceTest.java | 2 +-
16 files changed, 245 insertions(+), 70 deletions(-)
diff --git a/modules/vault/src/test/java/org/apache/ignite/internal/vault/CompletableFutureMatcher.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
similarity index 87%
rename from modules/vault/src/test/java/org/apache/ignite/internal/vault/CompletableFutureMatcher.java
rename to modules/core/src/test/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
index ecff5bc..bfb958c 100644
--- a/modules/vault/src/test/java/org/apache/ignite/internal/vault/CompletableFutureMatcher.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.vault;
+package org.apache.ignite.internal.testframework.matchers;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -58,6 +58,13 @@ public class CompletableFutureMatcher<T> extends TypeSafeMatcher<CompletableFutu
description.appendText("is ").appendDescriptionOf(matcher);
}
+ /** {@inheritDoc} */
+ @Override protected void describeMismatchSafely(CompletableFuture<T> item, Description mismatchDescription) {
+ Object valueDescription = item.isDone() ? item.join() : item;
+
+ mismatchDescription.appendText("was ").appendValue(valueDescription);
+ }
+
/**
* Factory method.
*
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 697b213..a2af938 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -174,7 +174,7 @@ public class MetaStorageManager {
// );
// TODO: IGNITE-14414 Cluster initialization flow. Here we should complete metaStorageServiceFuture.
- clusterNetSvc.messagingService().addMessageHandler((message, senderAddr, correlationId) -> {});
+// clusterNetSvc.messagingService().addMessageHandler((message, senderAddr, correlationId) -> {});
}
/**
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/AbstractMessagingService.java b/modules/network-api/src/main/java/org/apache/ignite/network/AbstractMessagingService.java
index 61046da..33df0ed 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/AbstractMessagingService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/AbstractMessagingService.java
@@ -17,26 +17,58 @@
package org.apache.ignite.network;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.ignite.network.annotations.MessageGroup;
/**
* Base class for {@link MessagingService} implementations.
*/
public abstract class AbstractMessagingService implements MessagingService {
- /** */
- private final Collection<NetworkMessageHandler> messageHandlers = new CopyOnWriteArrayList<>();
+ /** Mapping from group type (array index) to a list of registered message handlers. */
+ private final AtomicReferenceArray<List<NetworkMessageHandler>> handlersByGroupType =
+ new AtomicReferenceArray<>(Short.MAX_VALUE + 1);
/** {@inheritDoc} */
- @Override public void addMessageHandler(NetworkMessageHandler handler) {
- messageHandlers.add(handler);
+ @Override public void addMessageHandler(Class<?> messageGroup, NetworkMessageHandler handler) {
+ handlersByGroupType.getAndUpdate(getMessageGroupType(messageGroup), handlers -> {
+ if (handlers == null)
+ return List.of(handler);
+
+ var result = new ArrayList<NetworkMessageHandler>(handlers.size() + 1);
+
+ result.addAll(handlers);
+ result.add(handler);
+
+ return result;
+ });
+ }
+
+ /**
+ * Extracts the message group ID from a class annotated with {@link MessageGroup}.
+ */
+ private static short getMessageGroupType(Class<?> messageGroup) {
+ MessageGroup annotation = messageGroup.getAnnotation(MessageGroup.class);
+
+ assert annotation != null : "No MessageGroup annotation present on " + messageGroup;
+
+ short groupType = annotation.groupType();
+
+ assert groupType >= 0 : "Group type must not be negative";
+
+ return groupType;
}
/**
* @return registered message handlers.
*/
- public Collection<NetworkMessageHandler> getMessageHandlers() {
- return Collections.unmodifiableCollection(messageHandlers);
+ protected final Collection<NetworkMessageHandler> getMessageHandlers(short groupType) {
+ assert groupType >= 0 : "Group type must not be negative";
+
+ List<NetworkMessageHandler> result = handlersByGroupType.get(groupType);
+
+ return result == null ? List.of() : result;
}
}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
index 56e0b77..c524856 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
@@ -18,6 +18,7 @@
package org.apache.ignite.network;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.network.annotations.MessageGroup;
/**
* Entry point for sending messages between network members in both weak and patient mode.
@@ -90,9 +91,12 @@ public interface MessagingService {
CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, NetworkMessage msg, long timeout);
/**
- * Registers a handler for network message events.
+ * Registers a listener for a group of network message events.
+ * <p>
+ * Message group is specified by providing a class annotated with the {@link MessageGroup} annotation.
*
+ * @param messageGroup Message group descriptor.
* @param handler Message handler.
*/
- void addMessageHandler(NetworkMessageHandler handler);
+ void addMessageHandler(Class<?> messageGroup, NetworkMessageHandler handler);
}
diff --git a/modules/network/pom.xml b/modules/network/pom.xml
index cedd484..0fd718b 100644
--- a/modules/network/pom.xml
+++ b/modules/network/pom.xml
@@ -67,6 +67,13 @@
<!-- Test dependencies -->
<dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index 058a013..a2c78da 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.ignite.network.scalecube;
+import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collection;
@@ -29,21 +30,27 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.transport.api.Transport;
+import org.apache.ignite.internal.network.NetworkMessageTypes;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.TestMessage;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.TestMessageTypes;
import org.apache.ignite.network.TestMessagesFactory;
import org.apache.ignite.network.TopologyEventHandler;
+import org.apache.ignite.network.annotations.MessageGroup;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -79,6 +86,7 @@ class ITScaleCubeNetworkMessagingTest {
for (ClusterService member : testCluster.members) {
member.messagingService().addMessageHandler(
+ TestMessageTypes.class,
(message, senderAddr, correlationId) -> {
messageStorage.put(member.localConfiguration().getName(), (TestMessage)message);
messageReceivedLatch.countDown();
@@ -106,6 +114,7 @@ class ITScaleCubeNetworkMessagingTest {
/**
* Tests graceful shutdown.
+ *
* @throws Exception If failed.
*/
@Test
@@ -151,6 +160,7 @@ class ITScaleCubeNetworkMessagingTest {
var dataFuture = new CompletableFuture<Data>();
member.messagingService().addMessageHandler(
+ TestMessageTypes.class,
(message, senderAddr, correlationId) ->
dataFuture.complete(new Data((TestMessage)message, senderAddr, correlationId))
);
@@ -182,10 +192,13 @@ class ITScaleCubeNetworkMessagingTest {
var requestMessage = messageFactory.testMessage().msg("request").build();
var responseMessage = messageFactory.testMessage().msg("response").build();
- member.messagingService().addMessageHandler((message, senderAddr, correlationId) -> {
- if (message.equals(requestMessage))
- member.messagingService().send(self, responseMessage, correlationId);
- });
+ member.messagingService().addMessageHandler(
+ TestMessageTypes.class,
+ (message, senderAddr, correlationId) -> {
+ if (message.equals(requestMessage))
+ member.messagingService().send(self, responseMessage, correlationId);
+ }
+ );
TestMessage actualResponseMessage = member.messagingService()
.invoke(self, requestMessage, 1000)
@@ -196,7 +209,79 @@ class ITScaleCubeNetworkMessagingTest {
}
/**
+ * Serializable message that belongs to the {@link NetworkMessageTypes} message group.
+ */
+ private static class MockNetworkMessage implements NetworkMessage, Serializable {
+ /** {@inheritDoc} */
+ @Override public short messageType() {
+ return 666;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short groupType() {
+ return NetworkMessageTypes.class.getAnnotation(MessageGroup.class).groupType();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ return getClass() == obj.getClass();
+ }
+ }
+
+ /**
+ * Tests that messages from different message groups can be delivered to different sets of handlers.
+ */
+ @Test
+ public void testMessageGroupsHandlers() throws Exception {
+ testCluster = new Cluster(2);
+ testCluster.startAwait();
+
+ ClusterService node1 = testCluster.members.get(0);
+ ClusterService node2 = testCluster.members.get(1);
+
+ var testMessageFuture1 = new CompletableFuture<NetworkMessage>();
+ var testMessageFuture2 = new CompletableFuture<NetworkMessage>();
+ var networkMessageFuture = new CompletableFuture<NetworkMessage>();
+
+ // register multiple handlers for the same group
+ node1.messagingService().addMessageHandler(
+ TestMessageTypes.class,
+ (message, senderAddr, correlationId) -> assertTrue(testMessageFuture1.complete(message))
+ );
+
+ node1.messagingService().addMessageHandler(
+ TestMessageTypes.class,
+ (message, senderAddr, correlationId) -> assertTrue(testMessageFuture2.complete(message))
+ );
+
+ // register a different handle for the second group
+ node1.messagingService().addMessageHandler(
+ NetworkMessageTypes.class,
+ (message, senderAddr, correlationId) -> assertTrue(networkMessageFuture.complete(message))
+ );
+
+ var testMessage = messageFactory.testMessage().msg("foo").build();
+
+ var networkMessage = new MockNetworkMessage();
+
+ // test that a message gets delivered to both handlers
+ node2.messagingService()
+ .send(node1.topologyService().localMember(), testMessage)
+ .get(1, TimeUnit.SECONDS);
+
+ // test that a message from the other group is only delivered to a single handler
+ node2.messagingService()
+ .send(node1.topologyService().localMember(), networkMessage)
+ .get(1, TimeUnit.SECONDS);
+
+ assertThat(testMessageFuture1, willBe(equalTo(testMessage)));
+ assertThat(testMessageFuture2, willBe(equalTo(testMessage)));
+ assertThat(networkMessageFuture, willBe(equalTo(networkMessage)));
+ }
+
+ /**
* Tests shutdown.
+ *
* @param forceful Whether shutdown should be forceful.
* @throws Exception If failed.
*/
@@ -238,6 +323,7 @@ class ITScaleCubeNetworkMessagingTest {
/**
* Find the cluster's transport and force it to stop.
+ *
* @param cluster Cluster to be shutdown.
* @throws Exception If failed to stop.
*/
@@ -319,6 +405,7 @@ class ITScaleCubeNetworkMessagingTest {
/**
* Start and wait for cluster to come up.
+ *
* @throws InterruptedException If failed.
*/
void startAwait() throws InterruptedException {
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index 9910eb7..cb9e185 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -60,7 +60,7 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
String correlationId = message.correlationId();
- for (NetworkMessageHandler handler : getMessageHandlers())
+ for (NetworkMessageHandler handler : getMessageHandlers(msg.groupType()))
handler.onReceived(msg, address, correlationId);
}
diff --git a/modules/network/src/test/java/org/apache/ignite/network/MessageSerializationRegistryImplTest.java b/modules/network/src/test/java/org/apache/ignite/network/MessageSerializationRegistryImplTest.java
index b7e40fa..105911f 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/MessageSerializationRegistryImplTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/MessageSerializationRegistryImplTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.network;
import org.apache.ignite.network.serialization.MessageDeserializer;
import org.apache.ignite.network.serialization.MessageSerializationFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.network.serialization.MessageSerializer;
import org.junit.jupiter.api.Test;
@@ -30,13 +31,14 @@ import static org.mockito.Mockito.mock;
* {@link MessageSerializationRegistryImpl} tests.
*/
public class MessageSerializationRegistryImplTest {
+ /** Default registry implementation. */
+ private final MessageSerializationRegistry registry = new MessageSerializationRegistryImpl();
+
/**
* Tests that a serialization factory can be registered.
*/
@Test
public void testRegisterFactory() {
- var registry = new MessageSerializationRegistryImpl();
-
registry.registerFactory(Msg.GROUP_TYPE, Msg.TYPE, new MsgSerializationFactory());
}
@@ -46,8 +48,6 @@ public class MessageSerializationRegistryImplTest {
*/
@Test
public void testRegisterFactoryWithSameType() {
- var registry = new MessageSerializationRegistryImpl();
-
registry.registerFactory(Msg.GROUP_TYPE, Msg.TYPE, new MsgSerializationFactory());
assertThrows(
@@ -62,8 +62,6 @@ public class MessageSerializationRegistryImplTest {
*/
@Test
public void testRegisterFactoryWithSameTypeDifferentModule() {
- var registry = new MessageSerializationRegistryImpl();
-
registry.registerFactory(Msg.GROUP_TYPE, Msg.TYPE, new MsgSerializationFactory());
short nextGroupType = Msg.GROUP_TYPE + 1;
@@ -80,8 +78,6 @@ public class MessageSerializationRegistryImplTest {
*/
@Test
public void testCreateSerializers() {
- var registry = new MessageSerializationRegistryImpl();
-
registry.registerFactory(Msg.GROUP_TYPE, Msg.TYPE, new MsgSerializationFactory());
assertNotNull(registry.createSerializer(Msg.GROUP_TYPE, Msg.TYPE));
@@ -94,12 +90,24 @@ public class MessageSerializationRegistryImplTest {
*/
@Test
public void testCreateSerializersIfNotRegistered() {
- var registry = new MessageSerializationRegistryImpl();
-
assertThrows(NetworkConfigurationException.class, () -> registry.createSerializer(Msg.GROUP_TYPE, Msg.TYPE));
assertThrows(NetworkConfigurationException.class, () -> registry.createDeserializer(Msg.GROUP_TYPE, Msg.TYPE));
}
+ /**
+ * Tests that edge values of group and message types are handled without out-of-bound errors.
+ */
+ @Test
+ public void testEdgeValues() {
+ registry.registerFactory((short)0, (short)0, new MsgSerializationFactory());
+
+ assertNotNull(registry.createSerializer((short)0, (short)0));
+
+ registry.registerFactory(Short.MAX_VALUE, Short.MAX_VALUE, new MsgSerializationFactory());
+
+ assertNotNull(registry.createSerializer(Short.MAX_VALUE, Short.MAX_VALUE));
+ }
+
/** */
private static class Msg implements NetworkMessage {
/** */
diff --git a/modules/network/src/test/java/org/apache/ignite/network/TestMessageTypes.java b/modules/network/src/test/java/org/apache/ignite/network/TestMessageTypes.java
index 94ba495..d00ab5b 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/TestMessageTypes.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/TestMessageTypes.java
@@ -21,9 +21,12 @@ import org.apache.ignite.network.annotations.MessageGroup;
@MessageGroup(groupName = "TestMessages", groupType = 2)
public class TestMessageTypes {
+ /** */
public static final short ALL_TYPES = 1;
+ /** */
public static final short TEST = 2;
+ /** */
public static final short NESTED_MESSAGE = 3;
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
index 0386379..699bdd4 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
@@ -38,8 +38,9 @@ import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.message.ActionRequest;
import org.apache.ignite.raft.client.message.GetLeaderRequest;
import org.apache.ignite.raft.client.message.GetLeaderResponse;
-import org.apache.ignite.raft.client.message.RaftErrorResponse;
+import org.apache.ignite.raft.client.message.RaftClientMessageGroup;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.client.message.RaftErrorResponse;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupListener;
import org.jetbrains.annotations.Nullable;
@@ -90,32 +91,35 @@ public class RaftServerImpl implements RaftServer {
readQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
writeQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
- service.messagingService().addMessageHandler((message, senderAddr, correlationId) -> {
- if (message instanceof GetLeaderRequest) {
- var localPeer = new Peer(service.topologyService().localMember().address());
+ service.messagingService().addMessageHandler(
+ RaftClientMessageGroup.class,
+ (message, senderAddr, correlationId) -> {
+ if (message instanceof GetLeaderRequest) {
+ var localPeer = new Peer(service.topologyService().localMember().address());
- GetLeaderResponse resp = clientMsgFactory.getLeaderResponse().leader(localPeer).build();
+ GetLeaderResponse resp = clientMsgFactory.getLeaderResponse().leader(localPeer).build();
- service.messagingService().send(senderAddr, resp, correlationId);
- }
- else if (message instanceof ActionRequest) {
- ActionRequest req0 = (ActionRequest)message;
+ service.messagingService().send(senderAddr, resp, correlationId);
+ }
+ else if (message instanceof ActionRequest) {
+ ActionRequest req0 = (ActionRequest)message;
- RaftGroupListener lsnr = listeners.get(req0.groupId());
+ RaftGroupListener lsnr = listeners.get(req0.groupId());
- if (lsnr == null) {
- sendError(senderAddr, correlationId, RaftErrorCode.ILLEGAL_STATE);
+ if (lsnr == null) {
+ sendError(senderAddr, correlationId, RaftErrorCode.ILLEGAL_STATE);
- return;
- }
+ return;
+ }
- if (req0.command() instanceof ReadCommand)
- handleActionRequest(senderAddr, req0, correlationId, readQueue, lsnr);
- else
- handleActionRequest(senderAddr, req0, correlationId, writeQueue, lsnr);
+ if (req0.command() instanceof ReadCommand)
+ handleActionRequest(senderAddr, req0, correlationId, readQueue, lsnr);
+ else
+ handleActionRequest(senderAddr, req0, correlationId, writeQueue, lsnr);
+ }
+ // TODO https://issues.apache.org/jira/browse/IGNITE-14775
}
- // TODO https://issues.apache.org/jira/browse/IGNITE-14775
- });
+ );
readWorker = new Thread(() -> processQueue(readQueue, RaftGroupListener::onRead), "read-cmd-worker#" + service.topologyService().localMember().toString());
readWorker.setDaemon(true);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcServer.java
index 7afb6c4..cc9f8b0 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcServer.java
@@ -23,20 +23,19 @@ import org.apache.ignite.raft.jraft.rpc.impl.ConnectionClosedEventListener;
*
*/
public interface RpcServer<T> extends Lifecycle<T> {
-
/**
* Register a conn closed event listener.
*
* @param listener the event listener.
*/
- void registerConnectionClosedEventListener(final ConnectionClosedEventListener listener);
+ void registerConnectionClosedEventListener(ConnectionClosedEventListener listener);
/**
* Register user processor.
*
* @param processor the user processor which has a interest
*/
- void registerProcessor(final RpcProcessor<?> processor);
+ void registerProcessor(RpcProcessor<?> processor);
/**
* @return bound port
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 03833ed..fa7f43c 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -21,6 +21,9 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.raft.client.message.RaftClientMessageGroup;
+import org.apache.ignite.raft.jraft.RaftMessageGroup;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
@@ -58,6 +61,10 @@ import org.jetbrains.annotations.Nullable;
public class IgniteRpcServer implements RpcServer<Void> {
private final ClusterService service;
+ private final NodeManager nodeManager;
+
+ private final Executor rpcExecutor;
+
private final List<ConnectionClosedEventListener> listeners = new CopyOnWriteArrayList<>();
private final Map<String, RpcProcessor> processors = new ConcurrentHashMap<>();
@@ -77,6 +84,8 @@ public class IgniteRpcServer implements RpcServer<Void> {
@Nullable Executor rpcExecutor
) {
this.service = service;
+ this.nodeManager = nodeManager;
+ this.rpcExecutor = rpcExecutor;
// raft server RPC
AppendEntriesRequestProcessor appendEntriesRequestProcessor =
@@ -106,8 +115,30 @@ public class IgniteRpcServer implements RpcServer<Void> {
registerProcessor(new ActionRequestProcessor(rpcExecutor, raftClientMessagesFactory));
registerProcessor(new org.apache.ignite.raft.jraft.rpc.impl.client.SnapshotRequestProcessor(rpcExecutor, raftClientMessagesFactory));
- service.messagingService().addMessageHandler((msg, senderAddr, corellationId) -> {
- Class<? extends NetworkMessage> cls = msg.getClass();
+ var messageHandler = new RpcMessageHandler();
+
+ service.messagingService().addMessageHandler(RaftMessageGroup.class, messageHandler);
+ service.messagingService().addMessageHandler(RaftClientMessageGroup.class, messageHandler);
+
+ service.topologyService().addEventHandler(new TopologyEventHandler() {
+ @Override public void onAppeared(ClusterNode member) {
+ // TODO asch optimize start replicator https://issues.apache.org/jira/browse/IGNITE-14843
+ }
+
+ @Override public void onDisappeared(ClusterNode member) {
+ for (ConnectionClosedEventListener listener : listeners)
+ listener.onClosed(service.topologyService().localMember().name(), member.name());
+ }
+ });
+ }
+
+ /**
+ * Implementation of a message handler that dispatches the incoming requests to a suitable {@link RpcProcessor}.
+ */
+ public class RpcMessageHandler implements NetworkMessageHandler {
+ /** {@inheritDoc} */
+ @Override public void onReceived(NetworkMessage message, NetworkAddress senderAddr, String correlationId) {
+ Class<? extends NetworkMessage> cls = message.getClass();
RpcProcessor<NetworkMessage> prc = processors.get(cls.getName());
// TODO asch cache mapping https://issues.apache.org/jira/browse/IGNITE-14832
@@ -128,7 +159,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
Executor executor = null;
if (selector != null)
- executor = selector.select(prc.getClass().getName(), msg, nodeManager);
+ executor = selector.select(prc.getClass().getName(), message, nodeManager);
if (executor == null)
executor = prc.executor();
@@ -145,7 +176,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
}
@Override public void sendResponse(Object responseObj) {
- service.messagingService().send(senderAddr, (NetworkMessage) responseObj, corellationId);
+ service.messagingService().send(senderAddr, (NetworkMessage) responseObj, correlationId);
}
@Override public NetworkAddress getRemoteAddress() {
@@ -157,20 +188,9 @@ public class IgniteRpcServer implements RpcServer<Void> {
}
};
- finalPrc.handleRequest(context, msg);
+ finalPrc.handleRequest(context, message);
});
- });
-
- service.topologyService().addEventHandler(new TopologyEventHandler() {
- @Override public void onAppeared(ClusterNode member) {
- // TODO asch optimize start replicator https://issues.apache.org/jira/browse/IGNITE-14843
- }
-
- @Override public void onDisappeared(ClusterNode member) {
- for (ConnectionClosedEventListener listener : listeners)
- listener.onClosed(service.topologyService().localMember().name(), member.name());
- }
- });
+ }
}
/** {@inheritDoc} */
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
index 6bc1a1e..a621b39 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
@@ -29,6 +29,7 @@ import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
* RPC server configured for integration tests.
*/
public class TestIgniteRpcServer extends IgniteRpcServer {
+ /** */
private final NodeOptions nodeOptions;
/**
@@ -51,9 +52,12 @@ public class TestIgniteRpcServer extends IgniteRpcServer {
JRaftUtils.createRequestExecutor(nodeOptions)
);
+ clusterService.messagingService().addMessageHandler(TestMessageGroup.class, new RpcMessageHandler());
+
this.nodeOptions = nodeOptions;
}
+ /** {@inheritDoc} */
@Override public void shutdown() {
super.shutdown();
diff --git a/modules/vault/src/integrationTest/java/org/apache/ignite/internal/vault/persistence/PersistencePropertiesVaultServiceTest.java b/modules/vault/src/integrationTest/java/org/apache/ignite/internal/vault/persistence/PersistencePropertiesVaultServiceTest.java
index 410afcb..a871fda 100644
--- a/modules/vault/src/integrationTest/java/org/apache/ignite/internal/vault/persistence/PersistencePropertiesVaultServiceTest.java
+++ b/modules/vault/src/integrationTest/java/org/apache/ignite/internal/vault/persistence/PersistencePropertiesVaultServiceTest.java
@@ -32,7 +32,7 @@ import org.apache.ignite.lang.ByteArray;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import static org.apache.ignite.internal.vault.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
diff --git a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java
index d8283cd..89c61d5 100644
--- a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java
+++ b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultManagerTest.java
@@ -22,7 +22,7 @@ import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
-import static org.apache.ignite.internal.vault.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
diff --git a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
index 5166587..e2337c9 100644
--- a/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
+++ b/modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
@@ -35,7 +35,7 @@ import org.junit.jupiter.api.Test;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
-import static org.apache.ignite.internal.vault.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;