You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2024/02/06 15:44:35 UTC

(ignite-3) branch main updated: IGNITE-21470 Wait for sender to appear before applying an ack silencer (#3163)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c0f92249c2 IGNITE-21470 Wait for sender to appear before applying an ack silencer (#3163)
c0f92249c2 is described below

commit c0f92249c2d5ac235710520b13bfbb2412c27928
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Tue Feb 6 19:44:29 2024 +0400

    IGNITE-21470 Wait for sender to appear before applying an ack silencer (#3163)
---
 .../internal/network/netty/ItConnectionManagerTest.java | 17 +++++++++++++++++
 .../network/netty/OutgoingAcknowledgementSilencer.java  |  5 +++++
 .../ignite/internal/network/netty/NettySender.java      |  8 ++++++++
 3 files changed, 30 insertions(+)

diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 65122b0481..a921dadfd0 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -388,6 +388,7 @@ public class ItConnectionManagerTest extends BaseIgniteAbstractTest {
                 ConnectionManagerWrapper manager2 = startManager(4001)
         ) {
             NettySender sender = manager1.openChannelTo(manager2).toCompletableFuture().get(10, TimeUnit.SECONDS);
+            waitTillChannelAppearsInMapOnAcceptor(sender, manager1, manager2);
 
             OutgoingAcknowledgementSilencer ackSilencer = dropAcksFrom(manager2);
 
@@ -402,6 +403,22 @@ public class ItConnectionManagerTest extends BaseIgniteAbstractTest {
         }
     }
 
+    private static void waitTillChannelAppearsInMapOnAcceptor(
+            NettySender senderFromOpener,
+            ConnectionManagerWrapper opener,
+            ConnectionManagerWrapper acceptor
+    ) throws InterruptedException {
+        assertTrue(
+                waitForCondition(
+                        () -> acceptor.channels().values().stream().anyMatch(acceptorSender
+                                -> acceptorSender.consistentId().equals(opener.connectionManager.consistentId())
+                                        && acceptorSender.channelId() == senderFromOpener.channelId()),
+                        TimeUnit.SECONDS.toMillis(10)
+                ),
+                "Did not observe the sender appearing in the acceptor's sender map in time"
+        );
+    }
+
     @Test
     public void sendFuturesCompleteInSendOrder() throws Exception {
         try (
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/OutgoingAcknowledgementSilencer.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/OutgoingAcknowledgementSilencer.java
index 160a16263e..e18f8458ee 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/OutgoingAcknowledgementSilencer.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/OutgoingAcknowledgementSilencer.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.network.netty;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import io.netty.channel.ChannelHandler.Sharable;
@@ -50,6 +53,8 @@ public class OutgoingAcknowledgementSilencer extends ChannelOutboundHandlerAdapt
      */
     public static OutgoingAcknowledgementSilencer installOn(Collection<NettySender> senders)
             throws InterruptedException {
+        assertThat(senders, not(empty()));
+
         OutgoingAcknowledgementSilencer ackSilencer = new OutgoingAcknowledgementSilencer(senders.size());
 
         for (NettySender sender : senders) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
index 82c570d8d3..7bff673b6e 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
@@ -30,6 +30,8 @@ import org.apache.ignite.internal.network.NettyBootstrapFactory;
 import org.apache.ignite.internal.network.OutNetworkObject;
 import org.apache.ignite.internal.network.direct.DirectMessageWriter;
 import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.tostring.IgniteToStringExclude;
+import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -49,6 +51,7 @@ public class NettySender {
 
     private final short channelId;
 
+    @IgniteToStringExclude
     private final RecoveryDescriptor recoveryDescriptor;
 
     /**
@@ -234,4 +237,9 @@ public class NettySender {
     public RecoveryDescriptor recoveryDescriptor() {
         return recoveryDescriptor;
     }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
 }