You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/02/21 21:39:42 UTC
[ignite-3] branch ignite-15655-tc updated: .
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch ignite-15655-tc
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-15655-tc by this push:
new c688fd3 .
c688fd3 is described below
commit c688fd381fe84e7f3e11f278bbf0335c1f6baf61
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Tue Feb 22 00:39:34 2022 +0300
.
---
.../ignite/internal/network/netty/ConnectionManager.java | 10 +++++++++-
.../network/scalecube/ScaleCubeDirectMarshallerTransport.java | 5 +++++
2 files changed, 14 insertions(+), 1 deletion(-)
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index d8f8967..0363469 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -30,6 +30,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
@@ -194,6 +196,8 @@ public class ConnectionManager {
return sender;
}
+ private ExecutorService svc = Executors.newFixedThreadPool(20);
+
/**
* Callback that is called upon receiving a new message.
*
@@ -201,7 +205,9 @@ public class ConnectionManager {
* @param message New message.
*/
private void onMessage(String consistentId, NetworkMessage message) {
- listeners.forEach(consumer -> consumer.accept(consistentId, message));
+ svc.submit(() -> {
+ listeners.forEach(consumer -> consumer.accept(consistentId, message));
+ });
}
/**
@@ -269,6 +275,8 @@ public class ConnectionManager {
} catch (Exception e) {
LOG.warn("Failed to stop the ConnectionManager: {}", e.getMessage());
}
+
+ svc.shutdown();
}
/**
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index ad763de..effb782 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -25,6 +25,8 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.message.ScaleCubeMessage;
import org.apache.ignite.internal.network.message.ScaleCubeMessageBuilder;
@@ -153,6 +155,7 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
/** {@inheritDoc} */
@Override
public Mono<Void> stop() {
+ svc.shutdown();
return Mono.defer(() -> {
stop.onComplete();
return onStop;
@@ -179,6 +182,8 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
});
}
+ private final ExecutorService svc = Executors.newSingleThreadExecutor();
+
/**
* Handles new network messages from {@link #connectionManager}.
*