You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/03/17 11:30:10 UTC

[ignite-3] branch main updated: IGNITE-16699 Properly stop executors in the network module (#731)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 066ba91  IGNITE-16699 Properly stop executors in the network module (#731)
066ba91 is described below

commit 066ba919d25a6625f25befd38510715e6722cbf6
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Thu Mar 17 14:30:01 2022 +0300

    IGNITE-16699 Properly stop executors in the network module (#731)
---
 .../ignite/internal/network/netty/ConnectionManager.java   | 14 ++++++++++----
 .../apache/ignite/internal/network/netty/NettySender.java  | 13 ++++++++++++-
 .../org/apache/ignite/network/DefaultMessagingService.java |  5 +++--
 3 files changed, 25 insertions(+), 7 deletions(-)

diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index a95d285..7c1ae03 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -208,7 +208,11 @@ public class ConnectionManager {
      * @param channel Channel from client to this {@link #server}.
      */
     private void onNewIncomingChannel(NettySender channel) {
-        channels.put(channel.consistentId(), channel);
+        NettySender oldChannel = channels.put(channel.consistentId(), channel);
+
+        if (oldChannel != null) {
+            oldChannel.close();
+        }
     }
 
     /**
@@ -255,9 +259,11 @@ public class ConnectionManager {
             return;
         }
 
-        Stream<CompletableFuture<Void>> stream = Stream.concat(
-                clients.values().stream().map(NettyClient::stop),
-                Stream.of(server.stop())
+        Stream<CompletableFuture<Void>> stream = Stream.concat(Stream.concat(
+                    clients.values().stream().map(NettyClient::stop),
+                    Stream.of(server.stop())
+                ),
+                channels.values().stream().map(NettySender::closeAsync)
         );
 
         CompletableFuture<Void> stopFut = CompletableFuture.allOf(stream.toArray(CompletableFuture<?>[]::new));
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
index f926739..12d89b1 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.network.netty;
 
+import static org.apache.ignite.internal.network.netty.NettyUtils.toCompletableFuture;
+
 import io.netty.channel.Channel;
 import io.netty.handler.stream.ChunkedInput;
 import java.util.concurrent.CompletableFuture;
@@ -57,7 +59,7 @@ public class NettySender {
      * @return Future of the send operation.
      */
     public CompletableFuture<Void> send(OutNetworkObject obj) {
-        return NettyUtils.toCompletableFuture(channel.writeAndFlush(obj));
+        return toCompletableFuture(channel.writeAndFlush(obj));
     }
 
     /**
@@ -86,6 +88,15 @@ public class NettySender {
     }
 
     /**
+     * Closes channel asynchronously.
+     *
+     * @return Future of the close operation.
+     */
+    public CompletableFuture<Void> closeAsync() {
+        return toCompletableFuture(this.channel.close());
+    }
+
+    /**
      * Returns {@code true} if the channel is open, {@code false} otherwise.
      *
      * @return {@code true} if the channel is open, {@code false} otherwise.
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index e677586..455f2b9 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
 import org.apache.ignite.internal.network.serialization.DescriptorRegistry;
 import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
 import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.jetbrains.annotations.Nullable;
@@ -414,7 +415,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
         requestsMap.clear();
 
-        inboundService.shutdown();
-        outboundService.shutdown();
+        IgniteUtils.shutdownAndAwaitTermination(inboundService, 10, TimeUnit.SECONDS);
+        IgniteUtils.shutdownAndAwaitTermination(outboundService, 10, TimeUnit.SECONDS);
     }
 }