You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/11/22 08:27:15 UTC

[flink] branch release-1.16 updated: [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new d8f7777f321 [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest
d8f7777f321 is described below

commit d8f7777f3216101fd31dbcdc4a725d2e7ead4113
Author: Yun Gao <ga...@gmail.com>
AuthorDate: Mon Dec 7 18:06:34 2020 +0800

    [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest
---
 .../io/network/netty/NettyBufferPoolTest.java      | 58 ++++++++++++++++------
 1 file changed, 44 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
index 3797d025e77..0aaa11ab64c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
@@ -18,36 +18,61 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import org.junit.After;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /** Tests for the {@link NettyBufferPool} wrapper. */
 public class NettyBufferPoolTest {
 
+    private final List<ByteBuf> needReleasing = new ArrayList<>();
+
+    @After
+    public void tearDown() {
+        try {
+            // Release all of the buffers.
+            for (ByteBuf buf : needReleasing) {
+                buf.release();
+            }
+
+            // Checks in a separate loop in case we have sliced buffers.
+            for (ByteBuf buf : needReleasing) {
+                assertEquals(0, buf.refCnt());
+            }
+        } finally {
+            needReleasing.clear();
+        }
+    }
+
     @Test
     public void testNoHeapAllocations() throws Exception {
-        NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
+        final NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
 
         // Buffers should prefer to be direct
-        assertTrue(nettyBufferPool.buffer().isDirect());
-        assertTrue(nettyBufferPool.buffer(128).isDirect());
-        assertTrue(nettyBufferPool.buffer(128, 256).isDirect());
+        assertTrue(releaseLater(nettyBufferPool.buffer()).isDirect());
+        assertTrue(releaseLater(nettyBufferPool.buffer(128)).isDirect());
+        assertTrue(releaseLater(nettyBufferPool.buffer(128, 256)).isDirect());
 
         // IO buffers should prefer to be direct
-        assertTrue(nettyBufferPool.ioBuffer().isDirect());
-        assertTrue(nettyBufferPool.ioBuffer(128).isDirect());
-        assertTrue(nettyBufferPool.ioBuffer(128, 256).isDirect());
+        assertTrue(releaseLater(nettyBufferPool.ioBuffer()).isDirect());
+        assertTrue(releaseLater(nettyBufferPool.ioBuffer(128)).isDirect());
+        assertTrue(releaseLater(nettyBufferPool.ioBuffer(128, 256)).isDirect());
 
         // Currently we fakes the heap buffer allocation with direct buffers
-        assertTrue(nettyBufferPool.heapBuffer().isDirect());
-        assertTrue(nettyBufferPool.heapBuffer(128).isDirect());
-        assertTrue(nettyBufferPool.heapBuffer(128, 256).isDirect());
+        assertTrue(releaseLater(nettyBufferPool.heapBuffer()).isDirect());
+        assertTrue(releaseLater(nettyBufferPool.heapBuffer(128)).isDirect());
+        assertTrue(releaseLater(nettyBufferPool.heapBuffer(128, 256)).isDirect());
 
         // Composite buffers allocates the corresponding type of buffers when extending its capacity
-        assertTrue(nettyBufferPool.compositeHeapBuffer().capacity(1024).isDirect());
-        assertTrue(nettyBufferPool.compositeHeapBuffer(10).capacity(1024).isDirect());
+        assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer()).capacity(1024).isDirect());
+        assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer(10)).capacity(1024).isDirect());
 
         // Is direct buffer pooled!
         assertTrue(nettyBufferPool.isDirectBufferPooled());
@@ -60,16 +85,21 @@ public class NettyBufferPoolTest {
 
         {
             // Single large buffer allocates one chunk
-            nettyBufferPool.directBuffer(chunkSize - 64);
+            releaseLater(nettyBufferPool.directBuffer(chunkSize - 64));
             long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get();
             assertEquals(chunkSize, allocated);
         }
 
         {
             // Allocate a little more (one more chunk required)
-            nettyBufferPool.directBuffer(128);
+            releaseLater(nettyBufferPool.directBuffer(128));
             long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get();
             assertEquals(2 * chunkSize, allocated);
         }
     }
+
+    private ByteBuf releaseLater(ByteBuf buf) {
+        needReleasing.add(buf);
+        return buf;
+    }
 }