You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/11/22 08:27:15 UTC
[flink] branch release-1.16 updated: [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest
This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new d8f7777f321 [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest
d8f7777f321 is described below
commit d8f7777f3216101fd31dbcdc4a725d2e7ead4113
Author: Yun Gao <ga...@gmail.com>
AuthorDate: Mon Dec 7 18:06:34 2020 +0800
[FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest
---
.../io/network/netty/NettyBufferPoolTest.java | 58 ++++++++++++++++------
1 file changed, 44 insertions(+), 14 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
index 3797d025e77..0aaa11ab64c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
@@ -18,36 +18,61 @@
package org.apache.flink.runtime.io.network.netty;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import org.junit.After;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** Tests for the {@link NettyBufferPool} wrapper. */
public class NettyBufferPoolTest {
+ private final List<ByteBuf> needReleasing = new ArrayList<>();
+
+ @After
+ public void tearDown() {
+ try {
+ // Release all of the buffers.
+ for (ByteBuf buf : needReleasing) {
+ buf.release();
+ }
+
+ // Checks in a separate loop in case we have sliced buffers.
+ for (ByteBuf buf : needReleasing) {
+ assertEquals(0, buf.refCnt());
+ }
+ } finally {
+ needReleasing.clear();
+ }
+ }
+
@Test
public void testNoHeapAllocations() throws Exception {
- NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
+ final NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
// Buffers should prefer to be direct
- assertTrue(nettyBufferPool.buffer().isDirect());
- assertTrue(nettyBufferPool.buffer(128).isDirect());
- assertTrue(nettyBufferPool.buffer(128, 256).isDirect());
+ assertTrue(releaseLater(nettyBufferPool.buffer()).isDirect());
+ assertTrue(releaseLater(nettyBufferPool.buffer(128)).isDirect());
+ assertTrue(releaseLater(nettyBufferPool.buffer(128, 256)).isDirect());
// IO buffers should prefer to be direct
- assertTrue(nettyBufferPool.ioBuffer().isDirect());
- assertTrue(nettyBufferPool.ioBuffer(128).isDirect());
- assertTrue(nettyBufferPool.ioBuffer(128, 256).isDirect());
+ assertTrue(releaseLater(nettyBufferPool.ioBuffer()).isDirect());
+ assertTrue(releaseLater(nettyBufferPool.ioBuffer(128)).isDirect());
+ assertTrue(releaseLater(nettyBufferPool.ioBuffer(128, 256)).isDirect());
// Currently we fakes the heap buffer allocation with direct buffers
- assertTrue(nettyBufferPool.heapBuffer().isDirect());
- assertTrue(nettyBufferPool.heapBuffer(128).isDirect());
- assertTrue(nettyBufferPool.heapBuffer(128, 256).isDirect());
+ assertTrue(releaseLater(nettyBufferPool.heapBuffer()).isDirect());
+ assertTrue(releaseLater(nettyBufferPool.heapBuffer(128)).isDirect());
+ assertTrue(releaseLater(nettyBufferPool.heapBuffer(128, 256)).isDirect());
// Composite buffers allocates the corresponding type of buffers when extending its capacity
- assertTrue(nettyBufferPool.compositeHeapBuffer().capacity(1024).isDirect());
- assertTrue(nettyBufferPool.compositeHeapBuffer(10).capacity(1024).isDirect());
+ assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer()).capacity(1024).isDirect());
+ assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer(10)).capacity(1024).isDirect());
// Is direct buffer pooled!
assertTrue(nettyBufferPool.isDirectBufferPooled());
@@ -60,16 +85,21 @@ public class NettyBufferPoolTest {
{
// Single large buffer allocates one chunk
- nettyBufferPool.directBuffer(chunkSize - 64);
+ releaseLater(nettyBufferPool.directBuffer(chunkSize - 64));
long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get();
assertEquals(chunkSize, allocated);
}
{
// Allocate a little more (one more chunk required)
- nettyBufferPool.directBuffer(128);
+ releaseLater(nettyBufferPool.directBuffer(128));
long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get();
assertEquals(2 * chunkSize, allocated);
}
}
+
+ private ByteBuf releaseLater(ByteBuf buf) {
+ needReleasing.add(buf);
+ return buf;
+ }
}