You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/01/13 14:13:02 UTC

[flink] branch release-1.14 updated (e93fb03 -> 7784ec7)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from e93fb03  [FLINK-25280][connector/kafka] Disable log deletion in KafkaTestEnvironmentImpl to prevent records from being deleted during test run
     new 1c151e46 [hotfix] Rename some methods of NetworkBufferPool and add more comments for better readability
     new 7784ec7  [FLINK-25407][network] Fix the issues caused by FLINK-24035

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/core/memory/MemorySegmentProvider.java   |  4 +-
 .../runtime/io/network/buffer/LocalBufferPool.java |  6 +-
 .../io/network/buffer/NetworkBufferPool.java       | 52 +++++++++---
 .../network/partition/consumer/BufferManager.java  |  7 +-
 .../io/network/buffer/BufferPoolFactoryTest.java   | 12 +--
 .../io/network/buffer/LocalBufferPoolTest.java     | 63 ++++++++++++++-
 .../io/network/buffer/NetworkBufferPoolTest.java   | 93 +++++++++++-----------
 .../network/partition/InputChannelTestUtils.java   | 10 ++-
 8 files changed, 170 insertions(+), 77 deletions(-)

[flink] 02/02: [FLINK-25407][network] Fix the issues caused by FLINK-24035

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7784ec7284bfc26a2561d8cd9ec3ab4aead6c104
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Thu Dec 30 21:56:15 2021 +0800

    [FLINK-25407][network] Fix the issues caused by FLINK-24035
    
    This PR tries to fix the issues caused by FLINK-24035. More specifically, there are two issues, the first one is the deadlock caused by acquiring the 'factoryLock' in NetworkBufferPool and the other is the incorrect decreasing of the required segments of NetworkBufferPool. Both issues occur during exception handling of requesting segments. Actually, when reserving memory segments for LocalBufferPool, there is no need to modify the value of required segments. As a result, there is no n [...]
    
    This closes #18173.
---
 .../io/network/buffer/NetworkBufferPool.java       | 17 ++++---
 .../io/network/buffer/LocalBufferPoolTest.java     | 59 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 509db03..d9717a4 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -208,7 +208,13 @@ public class NetworkBufferPool
             tryRedistributeBuffers(numberOfSegmentsToRequest);
         }
 
-        return internalRequestMemorySegments(numberOfSegmentsToRequest);
+        try {
+            return internalRequestMemorySegments(numberOfSegmentsToRequest);
+        } catch (IOException exception) {
+            revertRequiredBuffers(numberOfSegmentsToRequest);
+            ExceptionUtils.rethrowIOException(exception);
+            return null;
+        }
     }
 
     private List<MemorySegment> internalRequestMemorySegments(int numberOfSegmentsToRequest)
@@ -248,7 +254,7 @@ public class NetworkBufferPool
                 }
             }
         } catch (Throwable e) {
-            recycleMemorySegments(segments, numberOfSegmentsToRequest);
+            internalRecycleMemorySegments(segments);
             ExceptionUtils.rethrowIOException(e);
         }
 
@@ -272,12 +278,11 @@ public class NetworkBufferPool
      */
     @Override
     public void recycleUnpooledMemorySegments(Collection<MemorySegment> segments) {
-        recycleMemorySegments(segments, segments.size());
-    }
-
-    private void recycleMemorySegments(Collection<MemorySegment> segments, int size) {
         internalRecycleMemorySegments(segments);
+        revertRequiredBuffers(segments.size());
+    }
 
+    private void revertRequiredBuffers(int size) {
         synchronized (factoryLock) {
             numTotalRequiredBuffers -= size;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index b4b56fe..57edaca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -135,6 +135,65 @@ public class LocalBufferPoolTest extends TestLogger {
         }
     }
 
+    @Test(timeout = 10000) // timeout can indicate a potential deadlock
+    public void testReserveSegmentsAndCancel() throws Exception {
+        int totalSegments = 4;
+        int segmentsToReserve = 2;
+
+        NetworkBufferPool globalPool = new NetworkBufferPool(totalSegments, memorySegmentSize);
+        BufferPool localPool1 = globalPool.createBufferPool(segmentsToReserve, totalSegments);
+        List<MemorySegment> segments = new ArrayList<>();
+
+        try {
+            for (int i = 0; i < totalSegments; ++i) {
+                segments.add(localPool1.requestMemorySegmentBlocking());
+            }
+
+            BufferPool localPool2 = globalPool.createBufferPool(segmentsToReserve, totalSegments);
+            // the segment reserve thread will be blocked for no buffer is available
+            Thread reserveThread =
+                    new Thread(
+                            () -> {
+                                try {
+                                    localPool2.reserveSegments(segmentsToReserve);
+                                } catch (Throwable ignored) {
+                                }
+                            });
+            reserveThread.start();
+            Thread.sleep(100); // wait to be blocked
+
+            // the cancel thread can be blocked when redistributing buffers
+            Thread cancelThread =
+                    new Thread(
+                            () -> {
+                                localPool1.lazyDestroy();
+                                localPool2.lazyDestroy();
+                            });
+            cancelThread.start();
+
+            // it is expected that the segment reserve thread can be cancelled successfully
+            Thread interruptThread =
+                    new Thread(
+                            () -> {
+                                try {
+                                    do {
+                                        reserveThread.interrupt();
+                                        Thread.sleep(100);
+                                    } while (reserveThread.isAlive() || cancelThread.isAlive());
+                                } catch (Throwable ignored) {
+                                }
+                            });
+            interruptThread.start();
+
+            interruptThread.join();
+        } finally {
+            segments.forEach(localPool1::recycle);
+            localPool1.lazyDestroy();
+            assertEquals(0, globalPool.getNumberOfUsedMemorySegments());
+            globalPool.destroy();
+        }
+    }
+
     @Test
     public void testRequestMoreThanAvailable() {
         localBufferPool.setNumBuffers(numBuffers);

[flink] 01/02: [hotfix] Rename some methods of NetworkBufferPool and add more comments for better readability

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1c151e46c44ae65b324d3cf8f7e6eb247c328791
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Wed Dec 22 17:02:27 2021 +0800

    [hotfix] Rename some methods of NetworkBufferPool and add more comments for better readability
    
    This closes #18173.
---
 .../flink/core/memory/MemorySegmentProvider.java   |  4 +-
 .../runtime/io/network/buffer/LocalBufferPool.java |  6 +-
 .../io/network/buffer/NetworkBufferPool.java       | 35 ++++++--
 .../network/partition/consumer/BufferManager.java  |  7 +-
 .../io/network/buffer/BufferPoolFactoryTest.java   | 12 +--
 .../io/network/buffer/LocalBufferPoolTest.java     |  4 +-
 .../io/network/buffer/NetworkBufferPoolTest.java   | 93 +++++++++++-----------
 .../network/partition/InputChannelTestUtils.java   | 10 ++-
 8 files changed, 100 insertions(+), 71 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java
index c5fa945..265435d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java
@@ -23,8 +23,8 @@ import java.util.Collection;
 
 /** The provider used for requesting and releasing batch of memory segments. */
 public interface MemorySegmentProvider {
-    Collection<MemorySegment> requestMemorySegments(int numberOfSegmentsToRequest)
+    Collection<MemorySegment> requestUnpooledMemorySegments(int numberOfSegmentsToRequest)
             throws IOException;
 
-    void recycleMemorySegments(Collection<MemorySegment> segments) throws IOException;
+    void recycleUnpooledMemorySegments(Collection<MemorySegment> segments) throws IOException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 600c30d..b2bde83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -243,7 +243,7 @@ class LocalBufferPool implements BufferPool {
 
             if (numberOfRequestedMemorySegments < numberOfSegmentsToReserve) {
                 availableMemorySegments.addAll(
-                        networkBufferPool.requestMemorySegmentsBlocking(
+                        networkBufferPool.requestPooledMemorySegmentsBlocking(
                                 numberOfSegmentsToReserve - numberOfRequestedMemorySegments));
                 toNotify = availabilityHelper.getUnavailableToResetAvailable();
             }
@@ -403,7 +403,7 @@ class LocalBufferPool implements BufferPool {
                 !isDestroyed,
                 "Destroyed buffer pools should never acquire segments - this will lead to buffer leaks.");
 
-        MemorySegment segment = networkBufferPool.requestMemorySegment();
+        MemorySegment segment = networkBufferPool.requestPooledMemorySegment();
         if (segment != null) {
             availableMemorySegments.add(segment);
             numberOfRequestedMemorySegments++;
@@ -647,7 +647,7 @@ class LocalBufferPool implements BufferPool {
         assert Thread.holdsLock(availableMemorySegments);
 
         numberOfRequestedMemorySegments--;
-        networkBufferPool.recycle(segment);
+        networkBufferPool.recyclePooledMemorySegment(segment);
     }
 
     private void returnExcessMemorySegments() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 945e37f..509db03 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemorySegmentProvider;
 import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
@@ -149,27 +150,47 @@ public class NetworkBufferPool
                 segmentSize);
     }
 
+    /**
+     * Different from {@link #requestUnpooledMemorySegments} for unpooled segments allocation. This
+     * method and the below {@link #requestPooledMemorySegmentsBlocking} method are designed to be
+     * used from {@link LocalBufferPool} for pooled memory segments allocation. Note that these
+     * methods for pooled memory segments requesting and recycling are prohibited from acquiring the
+     * factoryLock to avoid deadlock.
+     */
     @Nullable
-    public MemorySegment requestMemorySegment() {
+    public MemorySegment requestPooledMemorySegment() {
         synchronized (availableMemorySegments) {
             return internalRequestMemorySegment();
         }
     }
 
-    public List<MemorySegment> requestMemorySegmentsBlocking(int numberOfSegmentsToRequest)
+    public List<MemorySegment> requestPooledMemorySegmentsBlocking(int numberOfSegmentsToRequest)
             throws IOException {
         return internalRequestMemorySegments(numberOfSegmentsToRequest);
     }
 
-    public void recycle(MemorySegment segment) {
+    /**
+     * Corresponding to {@link #requestPooledMemorySegmentsBlocking} and {@link
+     * #requestPooledMemorySegment}, this method is for pooled memory segments recycling.
+     */
+    public void recyclePooledMemorySegment(MemorySegment segment) {
         // Adds the segment back to the queue, which does not immediately free the memory
         // however, since this happens when references to the global pool are also released,
         // making the availableMemorySegments queue and its contained object reclaimable
         internalRecycleMemorySegments(Collections.singleton(checkNotNull(segment)));
     }
 
+    /**
+     * Unpooled memory segments are requested directly from {@link NetworkBufferPool}, as opposed to
+     * pooled segments, that are requested through {@link BufferPool} that was created from this
+     * {@link NetworkBufferPool} (see {@link #createBufferPool}). They are used for example for
+     * exclusive {@link RemoteInputChannel} credits, that are permanently assigned to that channel,
+     * and never returned to any {@link BufferPool}. As opposed to pooled segments, when requested,
+     * unpooled segments needs to be accounted against {@link #numTotalRequiredBuffers}, which might
+     * require redistribution of the segments.
+     */
     @Override
-    public List<MemorySegment> requestMemorySegments(int numberOfSegmentsToRequest)
+    public List<MemorySegment> requestUnpooledMemorySegments(int numberOfSegmentsToRequest)
             throws IOException {
         checkArgument(
                 numberOfSegmentsToRequest >= 0,
@@ -245,8 +266,12 @@ public class NetworkBufferPool
         return segment;
     }
 
+    /**
+     * Corresponding to {@link #requestUnpooledMemorySegments}, this method is for unpooled memory
+     * segments recycling.
+     */
     @Override
-    public void recycleMemorySegments(Collection<MemorySegment> segments) {
+    public void recycleUnpooledMemorySegments(Collection<MemorySegment> segments) {
         recycleMemorySegments(segments, segments.size());
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
index 41eb12f..db38025 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
@@ -139,7 +139,8 @@ public class BufferManager implements BufferListener, BufferRecycler {
             return;
         }
 
-        Collection<MemorySegment> segments = globalPool.requestMemorySegments(numExclusiveBuffers);
+        Collection<MemorySegment> segments =
+                globalPool.requestUnpooledMemorySegments(numExclusiveBuffers);
         synchronized (bufferQueue) {
             // AvailableBufferQueue::addExclusiveBuffer may release the previously allocated
             // floating buffer, which requires the caller to recycle these released floating
@@ -213,7 +214,7 @@ public class BufferManager implements BufferListener, BufferRecycler {
                 // Similar to notifyBufferAvailable(), make sure that we never add a buffer
                 // after channel released all buffers via releaseAllResources().
                 if (inputChannel.isReleased()) {
-                    globalPool.recycleMemorySegments(Collections.singletonList(segment));
+                    globalPool.recycleUnpooledMemorySegments(Collections.singletonList(segment));
                     return;
                 } else {
                     releasedFloatingBuffer =
@@ -280,7 +281,7 @@ public class BufferManager implements BufferListener, BufferRecycler {
         }
         try {
             if (exclusiveRecyclingSegments.size() > 0) {
-                globalPool.recycleMemorySegments(exclusiveRecyclingSegments);
+                globalPool.recycleUnpooledMemorySegments(exclusiveRecyclingSegments);
             }
         } catch (Exception e) {
             err = firstOrSuppressed(e, err);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
index a305cfa2..19bd2e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
@@ -317,7 +317,7 @@ public class BufferPoolFactoryTest {
             BufferPool first = globalPool.createBufferPool(1, 10);
             assertEquals(10, first.getNumBuffers());
 
-            List<MemorySegment> segmentList1 = globalPool.requestMemorySegments(2);
+            List<MemorySegment> segmentList1 = globalPool.requestUnpooledMemorySegments(2);
             assertEquals(2, segmentList1.size());
             assertEquals(8, first.getNumBuffers());
 
@@ -325,12 +325,12 @@ public class BufferPoolFactoryTest {
             assertEquals(4, first.getNumBuffers());
             assertEquals(4, second.getNumBuffers());
 
-            List<MemorySegment> segmentList2 = globalPool.requestMemorySegments(2);
+            List<MemorySegment> segmentList2 = globalPool.requestUnpooledMemorySegments(2);
             assertEquals(2, segmentList2.size());
             assertEquals(3, first.getNumBuffers());
             assertEquals(3, second.getNumBuffers());
 
-            List<MemorySegment> segmentList3 = globalPool.requestMemorySegments(2);
+            List<MemorySegment> segmentList3 = globalPool.requestUnpooledMemorySegments(2);
             assertEquals(2, segmentList3.size());
             assertEquals(2, first.getNumBuffers());
             assertEquals(2, second.getNumBuffers());
@@ -339,17 +339,17 @@ public class BufferPoolFactoryTest {
                     "Wrong number of available segments after creating buffer pools and requesting segments.";
             assertEquals(msg, 2, globalPool.getNumberOfAvailableMemorySegments());
 
-            globalPool.recycleMemorySegments(segmentList1);
+            globalPool.recycleUnpooledMemorySegments(segmentList1);
             assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments());
             assertEquals(3, first.getNumBuffers());
             assertEquals(3, second.getNumBuffers());
 
-            globalPool.recycleMemorySegments(segmentList2);
+            globalPool.recycleUnpooledMemorySegments(segmentList2);
             assertEquals(msg, 6, globalPool.getNumberOfAvailableMemorySegments());
             assertEquals(4, first.getNumBuffers());
             assertEquals(4, second.getNumBuffers());
 
-            globalPool.recycleMemorySegments(segmentList3);
+            globalPool.recycleUnpooledMemorySegments(segmentList3);
             assertEquals(msg, 8, globalPool.getNumberOfAvailableMemorySegments());
             assertEquals(5, first.getNumBuffers());
             assertEquals(5, second.getNumBuffers());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 37e685f..b4b56fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -624,11 +624,11 @@ public class LocalBufferPoolTest extends TestLogger {
 
         @Nullable
         @Override
-        public MemorySegment requestMemorySegment() {
+        public MemorySegment requestPooledMemorySegment() {
             if (requestCounter++ == 1) {
                 return null;
             }
-            return super.requestMemorySegment();
+            return super.requestPooledMemorySegment();
         }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 48c0b2e..73877c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -128,7 +128,7 @@ public class NetworkBufferPoolTest extends TestLogger {
 
         NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize);
 
-        MemorySegment segment = globalPool.requestMemorySegment();
+        MemorySegment segment = globalPool.requestPooledMemorySegment();
         assertThat(segment, is(notNullValue()));
 
         assertThat(globalPool.getTotalNumberOfMemorySegments(), is(numBuffers));
@@ -244,8 +244,8 @@ public class NetworkBufferPoolTest extends TestLogger {
     }
 
     /**
-     * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
-     * currently containing the number of required free segments.
+     * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with the {@link
+     * NetworkBufferPool} currently containing the number of required free segments.
      */
     @Test
     public void testRequestMemorySegmentsLessThanTotalBuffers() throws IOException {
@@ -255,21 +255,21 @@ public class NetworkBufferPoolTest extends TestLogger {
 
         List<MemorySegment> memorySegments = Collections.emptyList();
         try {
-            memorySegments = globalPool.requestMemorySegments(numBuffers / 2);
+            memorySegments = globalPool.requestUnpooledMemorySegments(numBuffers / 2);
             assertEquals(memorySegments.size(), numBuffers / 2);
 
-            globalPool.recycleMemorySegments(memorySegments);
+            globalPool.recycleUnpooledMemorySegments(memorySegments);
             memorySegments.clear();
             assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
         } finally {
-            globalPool.recycleMemorySegments(memorySegments); // just in case
+            globalPool.recycleUnpooledMemorySegments(memorySegments); // just in case
             globalPool.destroy();
         }
     }
 
     /**
-     * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required
-     * buffers exceeding the capacity of {@link NetworkBufferPool}.
+     * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with the number of
+     * required buffers exceeding the capacity of {@link NetworkBufferPool}.
      */
     @Test
     public void testRequestMemorySegmentsMoreThanTotalBuffers() {
@@ -278,7 +278,7 @@ public class NetworkBufferPoolTest extends TestLogger {
         NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
 
         try {
-            globalPool.requestMemorySegments(numBuffers + 1);
+            globalPool.requestUnpooledMemorySegments(numBuffers + 1);
             fail("Should throw an IOException");
         } catch (IOException e) {
             assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
@@ -288,22 +288,22 @@ public class NetworkBufferPoolTest extends TestLogger {
     }
 
     /**
-     * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to cause
-     * exception.
+     * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with the invalid argument
+     * to cause exception.
      */
     @Test(expected = IllegalArgumentException.class)
     public void testRequestMemorySegmentsWithInvalidArgument() throws IOException {
         NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
         // the number of requested buffers should be non-negative
-        globalPool.requestMemorySegments(-1);
+        globalPool.requestUnpooledMemorySegments(-1);
         globalPool.destroy();
         fail("Should throw an IllegalArgumentException");
     }
 
     /**
-     * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
-     * currently not containing the number of required free segments (currently occupied by a buffer
-     * pool).
+     * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with the {@link
+     * NetworkBufferPool} currently not containing the number of required free segments (currently
+     * occupied by a buffer pool).
      */
     @Test
     public void testRequestMemorySegmentsWithBuffersTaken()
@@ -346,7 +346,7 @@ public class NetworkBufferPoolTest extends TestLogger {
 
             // take more buffers than are freely available at the moment via requestMemorySegments()
             isRunning.await();
-            memorySegments = networkBufferPool.requestMemorySegments(numBuffers / 2);
+            memorySegments = networkBufferPool.requestUnpooledMemorySegments(numBuffers / 2);
             assertThat(memorySegments, not(hasItem(nullValue())));
         } finally {
             if (bufferRecycler != null) {
@@ -355,21 +355,21 @@ public class NetworkBufferPoolTest extends TestLogger {
             if (lbp1 != null) {
                 lbp1.lazyDestroy();
             }
-            networkBufferPool.recycleMemorySegments(memorySegments);
+            networkBufferPool.recycleUnpooledMemorySegments(memorySegments);
             networkBufferPool.destroy();
         }
     }
 
     /**
-     * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted in
-     * case of a concurrent {@link NetworkBufferPool#destroy()} call.
+     * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)}, verifying it may be
+     * aborted in case of a concurrent {@link NetworkBufferPool#destroy()} call.
      */
     @Test
     public void testRequestMemorySegmentsInterruptable() throws Exception {
         final int numBuffers = 10;
 
         NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
-        MemorySegment segment = globalPool.requestMemorySegment();
+        MemorySegment segment = globalPool.requestPooledMemorySegment();
         assertNotNull(segment);
 
         final OneShotLatch isRunning = new OneShotLatch();
@@ -378,7 +378,7 @@ public class NetworkBufferPoolTest extends TestLogger {
                     @Override
                     public void go() throws IOException {
                         isRunning.trigger();
-                        globalPool.requestMemorySegments(10);
+                        globalPool.requestUnpooledMemorySegments(10);
                     }
                 };
         asyncRequest.start();
@@ -402,15 +402,15 @@ public class NetworkBufferPoolTest extends TestLogger {
     }
 
     /**
-     * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted and
-     * remains in a defined state even if the waiting is interrupted.
+     * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)}, verifying it may be
+     * aborted and remains in a defined state even if the waiting is interrupted.
      */
     @Test
     public void testRequestMemorySegmentsInterruptable2() throws Exception {
         final int numBuffers = 10;
 
         NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
-        MemorySegment segment = globalPool.requestMemorySegment();
+        MemorySegment segment = globalPool.requestPooledMemorySegment();
         assertNotNull(segment);
 
         final OneShotLatch isRunning = new OneShotLatch();
@@ -419,7 +419,7 @@ public class NetworkBufferPoolTest extends TestLogger {
                     @Override
                     public void go() throws IOException {
                         isRunning.trigger();
-                        globalPool.requestMemorySegments(10);
+                        globalPool.requestUnpooledMemorySegments(10);
                     }
                 };
         asyncRequest.start();
@@ -431,7 +431,7 @@ public class NetworkBufferPoolTest extends TestLogger {
         Thread.sleep(10);
         asyncRequest.interrupt();
 
-        globalPool.recycle(segment);
+        globalPool.recyclePooledMemorySegment(segment);
 
         try {
             asyncRequest.sync();
@@ -447,7 +447,7 @@ public class NetworkBufferPoolTest extends TestLogger {
     }
 
     /**
-     * Tests {@link NetworkBufferPool#requestMemorySegments(int)} and verifies it will end
+     * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} and verifies it will end
      * exceptionally when failing to acquire all the segments in the specific timeout.
      */
     @Test
@@ -470,7 +470,7 @@ public class NetworkBufferPoolTest extends TestLogger {
                 new CheckedThread() {
                     @Override
                     public void go() throws Exception {
-                        globalPool.requestMemorySegments(numberOfSegmentsToRequest);
+                        globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
                     }
                 };
 
@@ -489,8 +489,8 @@ public class NetworkBufferPoolTest extends TestLogger {
     /**
      * Tests {@link NetworkBufferPool#isAvailable()}, verifying that the buffer availability is
      * correctly maintained after memory segments are requested by {@link
-     * NetworkBufferPool#requestMemorySegment()} and recycled by {@link
-     * NetworkBufferPool#recycle(MemorySegment)}.
+     * NetworkBufferPool#requestPooledMemorySegment()} and recycled by {@link
+     * NetworkBufferPool#recyclePooledMemorySegment(MemorySegment)}.
      */
     @Test
     public void testIsAvailableOrNotAfterRequestAndRecycleSingleSegment() {
@@ -503,22 +503,22 @@ public class NetworkBufferPoolTest extends TestLogger {
             assertTrue(globalPool.getAvailableFuture().isDone());
 
             // request the first segment
-            final MemorySegment segment1 = checkNotNull(globalPool.requestMemorySegment());
+            final MemorySegment segment1 = checkNotNull(globalPool.requestPooledMemorySegment());
             assertTrue(globalPool.getAvailableFuture().isDone());
 
             // request the second segment
-            final MemorySegment segment2 = checkNotNull(globalPool.requestMemorySegment());
+            final MemorySegment segment2 = checkNotNull(globalPool.requestPooledMemorySegment());
             assertFalse(globalPool.getAvailableFuture().isDone());
 
             final CompletableFuture<?> availableFuture = globalPool.getAvailableFuture();
 
             // recycle the first segment
-            globalPool.recycle(segment1);
+            globalPool.recyclePooledMemorySegment(segment1);
             assertTrue(availableFuture.isDone());
             assertTrue(globalPool.getAvailableFuture().isDone());
 
             // recycle the second segment
-            globalPool.recycle(segment2);
+            globalPool.recyclePooledMemorySegment(segment2);
             assertTrue(globalPool.getAvailableFuture().isDone());
 
         } finally {
@@ -529,8 +529,8 @@ public class NetworkBufferPoolTest extends TestLogger {
     /**
      * Tests {@link NetworkBufferPool#isAvailable()}, verifying that the buffer availability is
      * correctly maintained after memory segments are requested by {@link
-     * NetworkBufferPool#requestMemorySegments(int)} and recycled by {@link
-     * NetworkBufferPool#recycleMemorySegments(Collection)}.
+     * NetworkBufferPool#requestUnpooledMemorySegments(int)} and recycled by {@link
+     * NetworkBufferPool#recycleUnpooledMemorySegments(Collection)}.
      */
     @Test(timeout = 10000L)
     public void testIsAvailableOrNotAfterRequestAndRecycleMultiSegments()
@@ -546,13 +546,13 @@ public class NetworkBufferPoolTest extends TestLogger {
 
             // request 5 segments
             List<MemorySegment> segments1 =
-                    globalPool.requestMemorySegments(numberOfSegmentsToRequest);
+                    globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
             assertTrue(globalPool.getAvailableFuture().isDone());
             assertEquals(numberOfSegmentsToRequest, segments1.size());
 
             // request another 5 segments
             List<MemorySegment> segments2 =
-                    globalPool.requestMemorySegments(numberOfSegmentsToRequest);
+                    globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
             assertFalse(globalPool.getAvailableFuture().isDone());
             assertEquals(numberOfSegmentsToRequest, segments2.size());
 
@@ -565,7 +565,8 @@ public class NetworkBufferPoolTest extends TestLogger {
                         public void go() throws Exception {
                             // this request should be blocked until at least 5 segments are recycled
                             segments3.addAll(
-                                    globalPool.requestMemorySegments(numberOfSegmentsToRequest));
+                                    globalPool.requestUnpooledMemorySegments(
+                                            numberOfSegmentsToRequest));
                             latch.countDown();
                         }
                     };
@@ -573,7 +574,7 @@ public class NetworkBufferPoolTest extends TestLogger {
 
             // recycle 5 segments
             CompletableFuture<?> availableFuture = globalPool.getAvailableFuture();
-            globalPool.recycleMemorySegments(segments1);
+            globalPool.recycleUnpooledMemorySegments(segments1);
             assertTrue(availableFuture.isDone());
 
             // wait util the third request is fulfilled
@@ -582,11 +583,11 @@ public class NetworkBufferPoolTest extends TestLogger {
             assertEquals(numberOfSegmentsToRequest, segments3.size());
 
             // recycle another 5 segments
-            globalPool.recycleMemorySegments(segments2);
+            globalPool.recycleUnpooledMemorySegments(segments2);
             assertTrue(globalPool.getAvailableFuture().isDone());
 
             // recycle the last 5 segments
-            globalPool.recycleMemorySegments(segments3);
+            globalPool.recycleUnpooledMemorySegments(segments3);
             assertTrue(globalPool.getAvailableFuture().isDone());
 
         } finally {
@@ -623,10 +624,10 @@ public class NetworkBufferPoolTest extends TestLogger {
             // request some segments from the global pool in two different ways
             final List<MemorySegment> segments = new ArrayList<>(numberOfSegmentsToRequest - 1);
             for (int i = 0; i < numberOfSegmentsToRequest - 1; ++i) {
-                segments.add(globalPool.requestMemorySegment());
+                segments.add(globalPool.requestPooledMemorySegment());
             }
             final List<MemorySegment> exclusiveSegments =
-                    globalPool.requestMemorySegments(
+                    globalPool.requestUnpooledMemorySegments(
                             globalPool.getNumberOfAvailableMemorySegments() - 1);
             assertTrue(globalPool.getAvailableFuture().isDone());
             for (final BufferPool localPool : localBufferPools) {
@@ -673,9 +674,9 @@ public class NetworkBufferPoolTest extends TestLogger {
 
             // recycle the previously requested segments
             for (MemorySegment segment : segments) {
-                globalPool.recycle(segment);
+                globalPool.recyclePooledMemorySegment(segment);
             }
-            globalPool.recycleMemorySegments(exclusiveSegments);
+            globalPool.recycleUnpooledMemorySegments(exclusiveSegments);
 
             assertTrue(globalPoolAvailableFuture.isDone());
             for (CompletableFuture<?> localPoolAvailableFuture : localPoolAvailableFutures) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index 8abcdd1..9d336cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -254,12 +254,13 @@ public class InputChannelTestUtils {
         private StubMemorySegmentProvider() {}
 
         @Override
-        public Collection<MemorySegment> requestMemorySegments(int numberOfSegmentsToRequest) {
+        public Collection<MemorySegment> requestUnpooledMemorySegments(
+                int numberOfSegmentsToRequest) {
             return Collections.emptyList();
         }
 
         @Override
-        public void recycleMemorySegments(Collection<MemorySegment> segments) {}
+        public void recycleUnpooledMemorySegments(Collection<MemorySegment> segments) {}
     }
 
     /** {@link MemorySegmentProvider} that provides unpooled {@link MemorySegment}s. */
@@ -271,12 +272,13 @@ public class InputChannelTestUtils {
         }
 
         @Override
-        public Collection<MemorySegment> requestMemorySegments(int numberOfSegmentsToRequest) {
+        public Collection<MemorySegment> requestUnpooledMemorySegments(
+                int numberOfSegmentsToRequest) {
             return Collections.singletonList(
                     MemorySegmentFactory.allocateUnpooledSegment(pageSize));
         }
 
         @Override
-        public void recycleMemorySegments(Collection<MemorySegment> segments) {}
+        public void recycleUnpooledMemorySegments(Collection<MemorySegment> segments) {}
     }
 }