You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/03/17 11:30:10 UTC
[ignite-3] branch main updated: IGNITE-16699 Properly stop executors in the network module (#731)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 066ba91 IGNITE-16699 Properly stop executors in the network module (#731)
066ba91 is described below
commit 066ba919d25a6625f25befd38510715e6722cbf6
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Thu Mar 17 14:30:01 2022 +0300
IGNITE-16699 Properly stop executors in the network module (#731)
---
.../ignite/internal/network/netty/ConnectionManager.java | 14 ++++++++++----
.../apache/ignite/internal/network/netty/NettySender.java | 13 ++++++++++++-
.../org/apache/ignite/network/DefaultMessagingService.java | 5 +++--
3 files changed, 25 insertions(+), 7 deletions(-)
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index a95d285..7c1ae03 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -208,7 +208,11 @@ public class ConnectionManager {
* @param channel Channel from client to this {@link #server}.
*/
private void onNewIncomingChannel(NettySender channel) {
- channels.put(channel.consistentId(), channel);
+ NettySender oldChannel = channels.put(channel.consistentId(), channel);
+
+ if (oldChannel != null) {
+ oldChannel.close();
+ }
}
/**
@@ -255,9 +259,11 @@ public class ConnectionManager {
return;
}
- Stream<CompletableFuture<Void>> stream = Stream.concat(
- clients.values().stream().map(NettyClient::stop),
- Stream.of(server.stop())
+ Stream<CompletableFuture<Void>> stream = Stream.concat(Stream.concat(
+ clients.values().stream().map(NettyClient::stop),
+ Stream.of(server.stop())
+ ),
+ channels.values().stream().map(NettySender::closeAsync)
);
CompletableFuture<Void> stopFut = CompletableFuture.allOf(stream.toArray(CompletableFuture<?>[]::new));
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
index f926739..12d89b1 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.network.netty;
+import static org.apache.ignite.internal.network.netty.NettyUtils.toCompletableFuture;
+
import io.netty.channel.Channel;
import io.netty.handler.stream.ChunkedInput;
import java.util.concurrent.CompletableFuture;
@@ -57,7 +59,7 @@ public class NettySender {
* @return Future of the send operation.
*/
public CompletableFuture<Void> send(OutNetworkObject obj) {
- return NettyUtils.toCompletableFuture(channel.writeAndFlush(obj));
+ return toCompletableFuture(channel.writeAndFlush(obj));
}
/**
@@ -86,6 +88,15 @@ public class NettySender {
}
/**
+ * Closes channel asynchronously.
+ *
+ * @return Future of the close operation.
+ */
+ public CompletableFuture<Void> closeAsync() {
+ return toCompletableFuture(this.channel.close());
+ }
+
+ /**
* Returns {@code true} if the channel is open, {@code false} otherwise.
*
* @return {@code true} if the channel is open, {@code false} otherwise.
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index e677586..455f2b9 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
import org.apache.ignite.internal.network.serialization.DescriptorRegistry;
import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.NodeStoppingException;
import org.jetbrains.annotations.Nullable;
@@ -414,7 +415,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
requestsMap.clear();
- inboundService.shutdown();
- outboundService.shutdown();
+ IgniteUtils.shutdownAndAwaitTermination(inboundService, 10, TimeUnit.SECONDS);
+ IgniteUtils.shutdownAndAwaitTermination(outboundService, 10, TimeUnit.SECONDS);
}
}