You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/05/26 14:31:32 UTC
[ignite-3] branch main updated: IGNITE-17040 Fix Netty buffer leak detected (#829)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 620fae1de IGNITE-17040 Fix Netty buffer leak detected (#829)
620fae1de is described below
commit 620fae1de6142033e25fd7874e268a1636db2557
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Thu May 26 18:31:29 2022 +0400
IGNITE-17040 Fix Netty buffer leak detected (#829)
---
.../internal/network/netty/InboundDecoderTest.java | 7 ++++++
.../internal/network/netty/NettyClientTest.java | 10 ++++++++
.../internal/network/netty/NettyServerTest.java | 3 +++
.../network/netty/RecoveryHandshakeTest.java | 23 ++++++++++++++++++
.../network/serialization/MarshallableTest.java | 28 +++++++++-------------
5 files changed, 54 insertions(+), 17 deletions(-)
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
index 3f7dd0285..04853d819 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.network.netty;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -123,6 +124,8 @@ public class InboundDecoderTest {
channel.writeInbound(buffer);
} while ((received = channel.readInbound()) == null);
+ assertFalse(channel.finish());
+
return received;
}
@@ -150,6 +153,8 @@ public class InboundDecoderTest {
channel.readInbound();
})
.get(3, TimeUnit.SECONDS);
+
+ assertFalse(channel.finish());
}
/**
@@ -223,6 +228,8 @@ public class InboundDecoderTest {
TestMessage actualMessage = (TestMessage) list.get(0);
assertEquals(msg, actualMessage);
+
+ assertFalse(channel.finish());
}
/**
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
index 95dd327d1..9dd9c0e66 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
@@ -77,6 +77,8 @@ public class NettyClientTest {
assertFalse(client.failedToConnect());
assertFalse(client.isDisconnected());
+
+ assertFalse(channel.finish());
}
/**
@@ -102,6 +104,8 @@ public class NettyClientTest {
assertTrue(client.failedToConnect());
assertFalse(client.isDisconnected());
+
+ assertFalse(channel.finish());
}
/**
@@ -124,6 +128,8 @@ public class NettyClientTest {
assertTrue(tuple.client.isDisconnected());
assertFalse(tuple.client.failedToConnect());
+
+ assertFalse(channel.finish());
}
/**
@@ -149,6 +155,8 @@ public class NettyClientTest {
assertTrue(client.isDisconnected());
assertTrue(client.failedToConnect());
+
+ assertFalse(channel.finish());
}
/**
@@ -177,6 +185,8 @@ public class NettyClientTest {
assertThrows(IgniteInternalException.class, () -> {
client.start(bootstrap);
});
+
+ assertFalse(channel.finish());
}
/**
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index f07fb5395..710861c64 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.network.netty;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -124,6 +125,8 @@ public class NettyServerTest {
future.setSuccess(null);
stop.get(3, TimeUnit.SECONDS);
+
+ assertFalse(channel.finish());
}
/**
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index e83384a2f..f28832740 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -95,6 +95,9 @@ public class RecoveryHandshakeTest {
checkPipelineAfterHandshake(serverSideChannel);
checkPipelineAfterHandshake(clientSideChannel);
+
+ assertFalse(serverSideChannel.finish());
+ assertFalse(clientSideChannel.finish());
}
@Test
@@ -145,6 +148,9 @@ public class RecoveryHandshakeTest {
checkPipelineAfterHandshake(serverSideChannel);
checkPipelineAfterHandshake(clientSideChannel);
+
+ assertFalse(serverSideChannel.finish());
+ assertFalse(clientSideChannel.finish());
}
@Test
@@ -195,6 +201,9 @@ public class RecoveryHandshakeTest {
checkPipelineAfterHandshake(serverSideChannel);
checkPipelineAfterHandshake(clientSideChannel);
+
+ assertFalse(serverSideChannel.finish());
+ assertFalse(clientSideChannel.finish());
}
@Test
@@ -222,8 +231,16 @@ public class RecoveryHandshakeTest {
exchangeClientToServer(in1to2, out2to1);
exchangeClientToServer(in2to1, out1to2);
+ exchangeServerToClient(in1to2, out2to1);
+ exchangeServerToClient(in2to1, out1to2);
+
assertNotSame(chm1.recoveryDescriptor(), shm1.recoveryDescriptor());
assertNotSame(chm2.recoveryDescriptor(), shm2.recoveryDescriptor());
+
+ assertFalse(out1to2.finish());
+ assertFalse(in1to2.finish());
+ assertFalse(out2to1.finish());
+ assertFalse(in2to1.finish());
}
@Test
@@ -300,6 +317,9 @@ public class RecoveryHandshakeTest {
var listener2 = new MessageListener("2", receivedSecond);
+ clientSideChannel.finishAndReleaseAll();
+ serverSideChannel.finishAndReleaseAll();
+
clientSideChannel = setupChannel(clientHandshakeManager, serverDidntReceiveAck ? listener2 : noMessageListener);
serverSideChannel = setupChannel(serverHandshakeManager, serverDidntReceiveAck ? noMessageListener : listener2);
@@ -326,6 +346,9 @@ public class RecoveryHandshakeTest {
assertNull(clientSideChannel.readOutbound());
assertTrue(receivedSecond.get());
+
+ assertFalse(serverSideChannel.finish());
+ assertFalse(clientSideChannel.finish());
}
/** Message listener that accepts a specific message only once. */
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
index f8814515b..574bdd1a2 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.hamcrest.collection.IsEmptyCollection.empty;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -29,7 +30,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
@@ -100,9 +100,7 @@ public class MarshallableTest {
MessageSerializer<NetworkMessage> serializer = registry.createSerializer(msg.groupType(), msg.messageType());
- var catcher = new OutboundByteBufCatcher();
var channel = new EmbeddedChannel(
- catcher,
new ChunkedWriteHandler(),
new OutboundEncoder(serializers.perSessionSerializationService)
);
@@ -114,7 +112,14 @@ public class MarshallableTest {
channel.flushOutbound();
- ByteBuffer nioBuffer = catcher.buf;
+ ByteBuffer nioBuffer = ByteBuffer.allocate(1000);
+
+ while (!channel.outboundMessages().isEmpty()) {
+ ByteBuf channelBuf = channel.readOutbound();
+ nioBuffer.put(channelBuf.nioBuffer());
+ }
+
+ assertFalse(channel.finish());
return nioBuffer;
}
@@ -170,6 +175,8 @@ public class MarshallableTest {
received.unmarshal(serializers.userObjectSerializer, serializers.descriptorRegistry);
+ assertFalse(channel.finish());
+
return received.marshallableMap();
}
@@ -197,19 +204,6 @@ public class MarshallableTest {
}
}
- private static class OutboundByteBufCatcher extends MessageToMessageEncoder<ByteBuf> {
- /** ByteBuffer that records incoming data. */
- private ByteBuffer buf = ByteBuffer.allocateDirect(1000);
-
- /** {@inheritDoc} */
- @Override
- protected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
- ByteBuffer nioBuffer = byteBuf.nioBuffer();
- buf.put(nioBuffer);
- list.add(1); // Just to make sure netty continues writing to the channel
- }
- }
-
/**
* Stub implementation of the {@link UserObjectMarshaller}, which uses the JDK's serializable
* serialization to actually marshall an object.