You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/02/21 21:39:42 UTC

[ignite-3] branch ignite-15655-tc updated: .

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-15655-tc
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-15655-tc by this push:
     new c688fd3  .
c688fd3 is described below

commit c688fd381fe84e7f3e11f278bbf0335c1f6baf61
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Tue Feb 22 00:39:34 2022 +0300

    .
---
 .../ignite/internal/network/netty/ConnectionManager.java       | 10 +++++++++-
 .../network/scalecube/ScaleCubeDirectMarshallerTransport.java  |  5 +++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index d8f8967..0363469 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -30,6 +30,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import java.util.function.Supplier;
@@ -194,6 +196,8 @@ public class ConnectionManager {
         return sender;
     }
 
+    private ExecutorService svc = Executors.newFixedThreadPool(20);
+
     /**
      * Callback that is called upon receiving a new message.
      *
@@ -201,7 +205,9 @@ public class ConnectionManager {
      * @param message New message.
      */
     private void onMessage(String consistentId, NetworkMessage message) {
-        listeners.forEach(consumer -> consumer.accept(consistentId, message));
+        svc.submit(() -> {
+            listeners.forEach(consumer -> consumer.accept(consistentId, message));
+        });
     }
 
     /**
@@ -269,6 +275,8 @@ public class ConnectionManager {
         } catch (Exception e) {
             LOG.warn("Failed to stop the ConnectionManager: {}", e.getMessage());
         }
+
+        svc.shutdown();
     }
 
     /**
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index ad763de..effb782 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -25,6 +25,8 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.message.ScaleCubeMessage;
 import org.apache.ignite.internal.network.message.ScaleCubeMessageBuilder;
@@ -153,6 +155,7 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
     /** {@inheritDoc} */
     @Override
     public Mono<Void> stop() {
+        svc.shutdown();
         return Mono.defer(() -> {
             stop.onComplete();
             return onStop;
@@ -179,6 +182,8 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
         });
     }
 
+    private final ExecutorService svc = Executors.newSingleThreadExecutor();
+
     /**
      * Handles new network messages from {@link #connectionManager}.
      *