You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/05/26 14:31:32 UTC

[ignite-3] branch main updated: IGNITE-17040 Fix Netty buffer leak detected (#829)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 620fae1de IGNITE-17040 Fix Netty buffer leak detected (#829)
620fae1de is described below

commit 620fae1de6142033e25fd7874e268a1636db2557
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Thu May 26 18:31:29 2022 +0400

    IGNITE-17040 Fix Netty buffer leak detected (#829)
---
 .../internal/network/netty/InboundDecoderTest.java |  7 ++++++
 .../internal/network/netty/NettyClientTest.java    | 10 ++++++++
 .../internal/network/netty/NettyServerTest.java    |  3 +++
 .../network/netty/RecoveryHandshakeTest.java       | 23 ++++++++++++++++++
 .../network/serialization/MarshallableTest.java    | 28 +++++++++-------------
 5 files changed, 54 insertions(+), 17 deletions(-)

diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
index 3f7dd0285..04853d819 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.network.netty;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -123,6 +124,8 @@ public class InboundDecoderTest {
             channel.writeInbound(buffer);
         } while ((received = channel.readInbound()) == null);
 
+        assertFalse(channel.finish());
+
         return received;
     }
 
@@ -150,6 +153,8 @@ public class InboundDecoderTest {
                     channel.readInbound();
                 })
                 .get(3, TimeUnit.SECONDS);
+
+        assertFalse(channel.finish());
     }
 
     /**
@@ -223,6 +228,8 @@ public class InboundDecoderTest {
         TestMessage actualMessage = (TestMessage) list.get(0);
 
         assertEquals(msg, actualMessage);
+
+        assertFalse(channel.finish());
     }
 
     /**
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
index 95dd327d1..9dd9c0e66 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
@@ -77,6 +77,8 @@ public class NettyClientTest {
 
         assertFalse(client.failedToConnect());
         assertFalse(client.isDisconnected());
+
+        assertFalse(channel.finish());
     }
 
     /**
@@ -102,6 +104,8 @@ public class NettyClientTest {
 
         assertTrue(client.failedToConnect());
         assertFalse(client.isDisconnected());
+
+        assertFalse(channel.finish());
     }
 
     /**
@@ -124,6 +128,8 @@ public class NettyClientTest {
 
         assertTrue(tuple.client.isDisconnected());
         assertFalse(tuple.client.failedToConnect());
+
+        assertFalse(channel.finish());
     }
 
     /**
@@ -149,6 +155,8 @@ public class NettyClientTest {
 
         assertTrue(client.isDisconnected());
         assertTrue(client.failedToConnect());
+
+        assertFalse(channel.finish());
     }
 
     /**
@@ -177,6 +185,8 @@ public class NettyClientTest {
         assertThrows(IgniteInternalException.class, () -> {
             client.start(bootstrap);
         });
+
+        assertFalse(channel.finish());
     }
 
     /**
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index f07fb5395..710861c64 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.network.netty;
 
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -124,6 +125,8 @@ public class NettyServerTest {
         future.setSuccess(null);
 
         stop.get(3, TimeUnit.SECONDS);
+
+        assertFalse(channel.finish());
     }
 
     /**
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index e83384a2f..f28832740 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -95,6 +95,9 @@ public class RecoveryHandshakeTest {
 
         checkPipelineAfterHandshake(serverSideChannel);
         checkPipelineAfterHandshake(clientSideChannel);
+
+        assertFalse(serverSideChannel.finish());
+        assertFalse(clientSideChannel.finish());
     }
 
     @Test
@@ -145,6 +148,9 @@ public class RecoveryHandshakeTest {
 
         checkPipelineAfterHandshake(serverSideChannel);
         checkPipelineAfterHandshake(clientSideChannel);
+
+        assertFalse(serverSideChannel.finish());
+        assertFalse(clientSideChannel.finish());
     }
 
     @Test
@@ -195,6 +201,9 @@ public class RecoveryHandshakeTest {
 
         checkPipelineAfterHandshake(serverSideChannel);
         checkPipelineAfterHandshake(clientSideChannel);
+
+        assertFalse(serverSideChannel.finish());
+        assertFalse(clientSideChannel.finish());
     }
 
     @Test
@@ -222,8 +231,16 @@ public class RecoveryHandshakeTest {
         exchangeClientToServer(in1to2, out2to1);
         exchangeClientToServer(in2to1, out1to2);
 
+        exchangeServerToClient(in1to2, out2to1);
+        exchangeServerToClient(in2to1, out1to2);
+
         assertNotSame(chm1.recoveryDescriptor(), shm1.recoveryDescriptor());
         assertNotSame(chm2.recoveryDescriptor(), shm2.recoveryDescriptor());
+
+        assertFalse(out1to2.finish());
+        assertFalse(in1to2.finish());
+        assertFalse(out2to1.finish());
+        assertFalse(in2to1.finish());
     }
 
     @Test
@@ -300,6 +317,9 @@ public class RecoveryHandshakeTest {
 
         var listener2 = new MessageListener("2", receivedSecond);
 
+        clientSideChannel.finishAndReleaseAll();
+        serverSideChannel.finishAndReleaseAll();
+
         clientSideChannel = setupChannel(clientHandshakeManager, serverDidntReceiveAck ? listener2 : noMessageListener);
         serverSideChannel = setupChannel(serverHandshakeManager, serverDidntReceiveAck ? noMessageListener : listener2);
 
@@ -326,6 +346,9 @@ public class RecoveryHandshakeTest {
         assertNull(clientSideChannel.readOutbound());
 
         assertTrue(receivedSecond.get());
+
+        assertFalse(serverSideChannel.finish());
+        assertFalse(clientSideChannel.finish());
     }
 
     /** Message listener that accepts a specific message only once. */
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
index f8814515b..574bdd1a2 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
 import static org.hamcrest.collection.IsEmptyCollection.empty;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
@@ -29,7 +30,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.handler.codec.MessageToMessageEncoder;
 import io.netty.handler.stream.ChunkedWriteHandler;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
@@ -100,9 +100,7 @@ public class MarshallableTest {
 
         MessageSerializer<NetworkMessage> serializer = registry.createSerializer(msg.groupType(), msg.messageType());
 
-        var catcher = new OutboundByteBufCatcher();
         var channel = new EmbeddedChannel(
-                catcher,
                 new ChunkedWriteHandler(),
                 new OutboundEncoder(serializers.perSessionSerializationService)
         );
@@ -114,7 +112,14 @@ public class MarshallableTest {
 
         channel.flushOutbound();
 
-        ByteBuffer nioBuffer = catcher.buf;
+        ByteBuffer nioBuffer = ByteBuffer.allocate(1000);
+
+        while (!channel.outboundMessages().isEmpty()) {
+            ByteBuf channelBuf = channel.readOutbound();
+            nioBuffer.put(channelBuf.nioBuffer());
+        }
+
+        assertFalse(channel.finish());
 
         return nioBuffer;
     }
@@ -170,6 +175,8 @@ public class MarshallableTest {
 
         received.unmarshal(serializers.userObjectSerializer, serializers.descriptorRegistry);
 
+        assertFalse(channel.finish());
+
         return received.marshallableMap();
     }
 
@@ -197,19 +204,6 @@ public class MarshallableTest {
         }
     }
 
-    private static class OutboundByteBufCatcher extends MessageToMessageEncoder<ByteBuf> {
-        /** ByteBuffer that records incoming data. */
-        private ByteBuffer buf = ByteBuffer.allocateDirect(1000);
-
-        /** {@inheritDoc} */
-        @Override
-        protected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
-            ByteBuffer nioBuffer = byteBuf.nioBuffer();
-            buf.put(nioBuffer);
-            list.add(1); // Just to make sure netty continues writing to the channel
-        }
-    }
-
     /**
      *  Stub implementation of the {@link UserObjectMarshaller}, which uses the JDK's serializable
      *  serialization to actually marshall an object.