You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/17 15:22:21 UTC

[GitHub] [flink] 1996fanrui opened a new pull request, #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

1996fanrui opened a new pull request, #19499:
URL: https://github.com/apache/flink/pull/19499

   ## What is the purpose of the change
   
   Add the overdraft buffer in ResultPartition to reduce unaligned checkpoint being blocked
   
   ## Brief change log
   
   - Add overdraft buffer in LocalBufferPool
   - Add configuration : taskmanager.network.memory.max-overdraft-buffers-per-gate
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - Added testOverdraftBuffer in LocalBufferPoolTest
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #19499:
URL: https://github.com/apache/flink/pull/19499#discussion_r901811510


##########
docs/content.zh/docs/ops/metrics.md:
##########
@@ -840,7 +840,7 @@ Deprecated: use [Default shuffle service metrics](#default-shuffle-service)
     </tr>
     <tr>
       <td>outPoolUsage</td>
-      <td>An estimate of the output buffers usage.</td>
+      <td>An estimate of the output buffers usage, the pool usage was > 100% if overdraft buffers are being used.</td>
       <td>Gauge</td>
     </tr>
     <tr>

Review Comment:
   I think in both English and Chinese versions there is also a second occurrence of the same metric for the `Default shuffle service metrics` that you also should modify. Here you have only modified the docs for the deprecated component that is preserved for the backward compatibility reasons. 



##########
docs/content.zh/docs/ops/metrics.md:
##########
@@ -840,7 +840,7 @@ Deprecated: use [Default shuffle service metrics](#default-shuffle-service)
     </tr>
     <tr>
       <td>outPoolUsage</td>
-      <td>An estimate of the output buffers usage.</td>
+      <td>An estimate of the output buffers usage, the pool usage was > 100% if overdraft buffers are being used.</td>

Review Comment:
   nit:
   ```
   An estimate of the output buffers usage. The pool usage can be > 100% if overdraft buffers are being used.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #19499:
URL: https://github.com/apache/flink/pull/19499#discussion_r901698881


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -298,7 +318,11 @@ public int getNumBuffers() {
 
     @Override
     public int bestEffortGetNumOfUsedBuffers() {
-        return Math.max(0, numberOfRequestedMemorySegments - availableMemorySegments.size());
+        return Math.max(
+                0,
+                numberOfRequestedMemorySegments
+                        + numberOfRequestedOverdraftMemorySegments
+                        - availableMemorySegments.size());

Review Comment:
   Thanks for your reminder. 
   
   I prefer report the pool usage was > 100% if overdraft buffers are being used. Because the overdraft buffer is overused, it doesn’t belong inside the LocalBufferPool. So I prefer it, what do you think?
   
   I updated the metric doc and added some check bestEffortGetNumOfUsedBuffers in unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #19499:
URL: https://github.com/apache/flink/pull/19499#issuecomment-1158533552

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #19499:
URL: https://github.com/apache/flink/pull/19499#issuecomment-1110642698

   > @1996fanrui , thanks for these changes. In general, the idea of implementation looks good but I have left several comments in PR and we still need to discuss configuration later.
   
   Hi @akalash , thanks for your review. I have read your comments, I'll address them after our discussion.
   
   I created [FLIP-227](https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer) and raised a discussion on the [flink dev mailing list](https://lists.apache.org/thread/4p3xcf0gg4py61hsnydvwpns07d1nog7). Please help to review and add some information in dev mailing list if something is missing. Thank you very much. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #19499:
URL: https://github.com/apache/flink/pull/19499#issuecomment-1159334613

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #19499:
URL: https://github.com/apache/flink/pull/19499#discussion_r885340244


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -472,18 +518,22 @@ private void onGlobalPoolAvailable() {
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;
+        return !availableMemorySegments.isEmpty()
+                && unavailableSubpartitionsCount == 0
+                && numberOfRequestedOverdraftMemorySegments == 0;

Review Comment:
   I deleted all checks in other places and use the `shouldBeAvailable()` instead of those checks to avoid misuse in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #19499:
URL: https://github.com/apache/flink/pull/19499#issuecomment-1142154056

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #19499:
URL: https://github.com/apache/flink/pull/19499#issuecomment-1160999901

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski merged pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
pnowojski merged PR #19499:
URL: https://github.com/apache/flink/pull/19499


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #19499:
URL: https://github.com/apache/flink/pull/19499#discussion_r901830575


##########
docs/content.zh/docs/ops/metrics.md:
##########
@@ -840,7 +840,7 @@ Deprecated: use [Default shuffle service metrics](#default-shuffle-service)
     </tr>
     <tr>
       <td>outPoolUsage</td>
-      <td>An estimate of the output buffers usage.</td>
+      <td>An estimate of the output buffers usage, the pool usage was > 100% if overdraft buffers are being used.</td>
       <td>Gauge</td>
     </tr>
     <tr>

Review Comment:
   Thanks for your reminder, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] akalash commented on a diff in pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
akalash commented on code in PR #19499:
URL: https://github.com/apache/flink/pull/19499#discussion_r899148430


##########
docs/layouts/shortcodes/generated/all_taskmanager_network_section.html:
##########
@@ -80,6 +80,12 @@
             <td>Integer</td>
             <td>Number of max buffers that can be used for each channel. If a channel exceeds the number of max buffers, it will make the task become unavailable, cause the back pressure and block the data processing. This might speed up checkpoint alignment by preventing excessive growth of the buffered in-flight data in case of data skew and high number of configured floating buffers. This limit is not strictly guaranteed, and can be ignored by things like flatMap operators, records spanning multiple buffers or single timer producing large amount of data.</td>
         </tr>
+        <tr>
+            <td><h5>taskmanager.network.memory.max-overdraft-buffers-per-gate</h5></td>
+            <td style="word-wrap: break-word;">5</td>
+            <td>Integer</td>
+            <td>Number of max overdraft network buffers to use for each ResultPartition. The overdraft buffers will be used when the ResultPartition cannot apply to the normal buffers, e.g: the all buffers of ResultPartition be applied or the number of buffers for a single channel has reached taskmanager.network.memory.max-buffers-per-channel. When the ResultPartition is unavailable, the ResultPartition can provide some additional buffers to allow overdraft. It can effectively solve the problem that multiple output buffers are required to process a single data, causing the Task to block in the requestMemory, such as: flatMap operator, records spanning multiple buffers or a single timer generates a large amount of data. It doesn't need to wait for ResultPartition is available to start Unaligned Checkpoint directly.</td>
+        </tr>

Review Comment:
   Just for the check. Did you generate this documentation automatically(`mvn -Pgenerate-config-docs ...`)?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -549,6 +551,157 @@ public void testConsistentAvailability() throws Exception {
         }
     }
 
+    @Test
+    public void testWithoutOverdraftBuffer() throws Exception {
+        localBufferPool.lazyDestroy();
+        int maxOverdraftBuffers = 0;
+
+        testUseAllBuffersForSingleChannel(4, 3, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(4, 3, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(8, 5, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(8, 5, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(12, 10, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(12, 10, maxOverdraftBuffers);
+    }
+
+    @Test
+    public void testWithOverdraftBuffer() throws Exception {
+        localBufferPool.lazyDestroy();
+        int maxOverdraftBuffers = 2;
+
+        testUseAllBuffersForSingleChannel(4, 3, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(4, 3, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(8, 5, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(8, 5, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(12, 10, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(12, 10, maxOverdraftBuffers);
+    }
+
+    private void testUseAllBuffersForSingleChannel(
+            int poolSize, int maxBuffersPerChannel, int maxOverdraftBuffers) throws Exception {
+        int numberOfSubpartitions = 2;
+        checkArgument(maxBuffersPerChannel > poolSize / numberOfSubpartitions);
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(
+                        networkBufferPool,
+                        1,
+                        Integer.MAX_VALUE,
+                        numberOfSubpartitions,
+                        maxBuffersPerChannel,
+                        maxOverdraftBuffers);
+        bufferPool.setNumBuffers(poolSize);
+
+        // Request all buffers inside the buffer pool
+        AutoCloseableRegistry channelCloseableRegistry = new AutoCloseableRegistry();
+        for (int i = 0; i < maxBuffersPerChannel; i++) {
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, true);
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            channelCloseableRegistry.registerCloseable(bufferBuilder);
+        }
+
+        AutoCloseableRegistry poolCloseableRegistry = new AutoCloseableRegistry();
+        for (int i = maxBuffersPerChannel; i < poolSize; i++) {
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            poolCloseableRegistry.registerCloseable(bufferBuilder);
+        }
+
+        // request overdraft buffer
+        AutoCloseableRegistry overdraftCloseableRegistry = new AutoCloseableRegistry();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+        for (int i = 0; i < maxOverdraftBuffers; i++) {
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            overdraftCloseableRegistry.registerCloseable(bufferBuilder);
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, i + 1, false);
+        }
+
+        BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(1);

Review Comment:
   I think that it makes sense to check that `requestBufferBuilder` for `0` channel returns NULL value as well 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -549,6 +551,157 @@ public void testConsistentAvailability() throws Exception {
         }
     }
 
+    @Test
+    public void testWithoutOverdraftBuffer() throws Exception {
+        localBufferPool.lazyDestroy();
+        int maxOverdraftBuffers = 0;
+
+        testUseAllBuffersForSingleChannel(4, 3, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(4, 3, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(8, 5, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(8, 5, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(12, 10, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(12, 10, maxOverdraftBuffers);
+    }
+
+    @Test
+    public void testWithOverdraftBuffer() throws Exception {
+        localBufferPool.lazyDestroy();
+        int maxOverdraftBuffers = 2;
+
+        testUseAllBuffersForSingleChannel(4, 3, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(4, 3, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(8, 5, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(8, 5, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(12, 10, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(12, 10, maxOverdraftBuffers);
+    }
+
+    private void testUseAllBuffersForSingleChannel(
+            int poolSize, int maxBuffersPerChannel, int maxOverdraftBuffers) throws Exception {
+        int numberOfSubpartitions = 2;
+        checkArgument(maxBuffersPerChannel > poolSize / numberOfSubpartitions);
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(
+                        networkBufferPool,
+                        1,
+                        Integer.MAX_VALUE,
+                        numberOfSubpartitions,
+                        maxBuffersPerChannel,
+                        maxOverdraftBuffers);
+        bufferPool.setNumBuffers(poolSize);
+
+        // Request all buffers inside the buffer pool
+        AutoCloseableRegistry channelCloseableRegistry = new AutoCloseableRegistry();
+        for (int i = 0; i < maxBuffersPerChannel; i++) {
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, true);
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            channelCloseableRegistry.registerCloseable(bufferBuilder);
+        }
+
+        AutoCloseableRegistry poolCloseableRegistry = new AutoCloseableRegistry();
+        for (int i = maxBuffersPerChannel; i < poolSize; i++) {
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            poolCloseableRegistry.registerCloseable(bufferBuilder);
+        }
+
+        // request overdraft buffer
+        AutoCloseableRegistry overdraftCloseableRegistry = new AutoCloseableRegistry();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+        for (int i = 0; i < maxOverdraftBuffers; i++) {
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            overdraftCloseableRegistry.registerCloseable(bufferBuilder);
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, i + 1, false);
+        }
+
+        BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(1);
+        assertNull(bufferBuilder);
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, maxOverdraftBuffers, false);
+
+        // release all bufferBuilder
+        overdraftCloseableRegistry.close();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+        poolCloseableRegistry.close();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+        channelCloseableRegistry.close();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, true);
+
+        bufferPool.lazyDestroy();
+    }
+
+    private void testUseAllBuffersForMultiChannel(

Review Comment:
   I can be mistaken but this method looks the same as above (`testUseAllBuffersForSingleChannel`). I think you can merge them into one but where you want to use the different channels you can use (`i % numberOfChannel`)



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -549,6 +551,157 @@ public void testConsistentAvailability() throws Exception {
         }
     }
 
+    @Test
+    public void testWithoutOverdraftBuffer() throws Exception {
+        localBufferPool.lazyDestroy();
+        int maxOverdraftBuffers = 0;
+
+        testUseAllBuffersForSingleChannel(4, 3, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(4, 3, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(8, 5, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(8, 5, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(12, 10, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(12, 10, maxOverdraftBuffers);
+    }
+
+    @Test
+    public void testWithOverdraftBuffer() throws Exception {
+        localBufferPool.lazyDestroy();
+        int maxOverdraftBuffers = 2;
+
+        testUseAllBuffersForSingleChannel(4, 3, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(4, 3, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(8, 5, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(8, 5, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(12, 10, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(12, 10, maxOverdraftBuffers);
+    }
+
+    private void testUseAllBuffersForSingleChannel(
+            int poolSize, int maxBuffersPerChannel, int maxOverdraftBuffers) throws Exception {
+        int numberOfSubpartitions = 2;
+        checkArgument(maxBuffersPerChannel > poolSize / numberOfSubpartitions);
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(
+                        networkBufferPool,
+                        1,
+                        Integer.MAX_VALUE,
+                        numberOfSubpartitions,
+                        maxBuffersPerChannel,
+                        maxOverdraftBuffers);
+        bufferPool.setNumBuffers(poolSize);
+
+        // Request all buffers inside the buffer pool
+        AutoCloseableRegistry channelCloseableRegistry = new AutoCloseableRegistry();
+        for (int i = 0; i < maxBuffersPerChannel; i++) {
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, true);
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            channelCloseableRegistry.registerCloseable(bufferBuilder);
+        }
+
+        AutoCloseableRegistry poolCloseableRegistry = new AutoCloseableRegistry();
+        for (int i = maxBuffersPerChannel; i < poolSize; i++) {
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            poolCloseableRegistry.registerCloseable(bufferBuilder);
+        }
+
+        // request overdraft buffer
+        AutoCloseableRegistry overdraftCloseableRegistry = new AutoCloseableRegistry();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+        for (int i = 0; i < maxOverdraftBuffers; i++) {
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            overdraftCloseableRegistry.registerCloseable(bufferBuilder);
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, i + 1, false);
+        }
+
+        BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(1);
+        assertNull(bufferBuilder);
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, maxOverdraftBuffers, false);
+
+        // release all bufferBuilder
+        overdraftCloseableRegistry.close();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+        poolCloseableRegistry.close();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+        channelCloseableRegistry.close();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, true);
+
+        bufferPool.lazyDestroy();
+    }
+
+    private void testUseAllBuffersForMultiChannel(
+            int poolSize, int maxBuffersPerChannel, int maxOverdraftBuffers) throws Exception {
+        int numberOfSubpartitions = 2;
+        checkArgument(maxBuffersPerChannel > poolSize / numberOfSubpartitions);
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(
+                        networkBufferPool,
+                        1,
+                        Integer.MAX_VALUE,
+                        numberOfSubpartitions,
+                        maxBuffersPerChannel,
+                        maxOverdraftBuffers);
+        bufferPool.setNumBuffers(poolSize);
+
+        // Request all buffers inside the buffer pool
+        AutoCloseableRegistry channel0CloseableRegistry = new AutoCloseableRegistry();
+        for (int i = 0; i < poolSize / 2; i++) {
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, true);
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);

Review Comment:
   For example here:
   ```
   BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(i % numberOfChannels);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #19499:
URL: https://github.com/apache/flink/pull/19499#issuecomment-1141802853

   Hi @akalash  @pnowojski , I updated this pr based on the FLIP-227, and I have addressed @akalash 's all comments. Please help to review in your free time, thanks a lot.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #19499:
URL: https://github.com/apache/flink/pull/19499#issuecomment-1159015244

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #19499:
URL: https://github.com/apache/flink/pull/19499#issuecomment-1152319736

   > @1996fanrui , Thanks for these changes. I have left a couple of minor comments but in general, it looks good. I want to take some time to think about how good the current tests are and then I will be ready to approve this PR.
   
   @akalash  , thanks for your review. I have updated. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #19499:
URL: https://github.com/apache/flink/pull/19499#issuecomment-1158503414

   Hi @akalash , thanks for your suggestion. Currently, the unit test is simpler than before.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #19499:
URL: https://github.com/apache/flink/pull/19499#discussion_r899769698


##########
docs/layouts/shortcodes/generated/all_taskmanager_network_section.html:
##########
@@ -80,6 +80,12 @@
             <td>Integer</td>
             <td>Number of max buffers that can be used for each channel. If a channel exceeds the number of max buffers, it will make the task become unavailable, cause the back pressure and block the data processing. This might speed up checkpoint alignment by preventing excessive growth of the buffered in-flight data in case of data skew and high number of configured floating buffers. This limit is not strictly guaranteed, and can be ignored by things like flatMap operators, records spanning multiple buffers or single timer producing large amount of data.</td>
         </tr>
+        <tr>
+            <td><h5>taskmanager.network.memory.max-overdraft-buffers-per-gate</h5></td>
+            <td style="word-wrap: break-word;">5</td>
+            <td>Integer</td>
+            <td>Number of max overdraft network buffers to use for each ResultPartition. The overdraft buffers will be used when the ResultPartition cannot apply to the normal buffers, e.g: the all buffers of ResultPartition be applied or the number of buffers for a single channel has reached taskmanager.network.memory.max-buffers-per-channel. When the ResultPartition is unavailable, the ResultPartition can provide some additional buffers to allow overdraft. It can effectively solve the problem that multiple output buffers are required to process a single data, causing the Task to block in the requestMemory, such as: flatMap operator, records spanning multiple buffers or a single timer generates a large amount of data. It doesn't need to wait for ResultPartition is available to start Unaligned Checkpoint directly.</td>
+        </tr>

Review Comment:
   Yes, CI will fail if the code and doc don't match.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #19499:
URL: https://github.com/apache/flink/pull/19499#discussion_r901698881


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -298,7 +318,11 @@ public int getNumBuffers() {
 
     @Override
     public int bestEffortGetNumOfUsedBuffers() {
-        return Math.max(0, numberOfRequestedMemorySegments - availableMemorySegments.size());
+        return Math.max(
+                0,
+                numberOfRequestedMemorySegments
+                        + numberOfRequestedOverdraftMemorySegments
+                        - availableMemorySegments.size());

Review Comment:
   Thanks for your reminder. 
   
   I prefer report the pool usage was > 100% if overdraft buffers are being used. Because the overdraft buffer is overused, it doesn’t belong inside the LocalBufferPool. So I prefer it, what do you think?



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -273,6 +273,26 @@ public class NettyShuffleEnvironmentOptions {
                                     + " case of data skew and high number of configured floating buffers. This limit is not strictly guaranteed,"
                                     + " and can be ignored by things like flatMap operators, records spanning multiple buffers or single timer"
                                     + " producing large amount of data.");
+    /**
+     * Number of max overdraft network buffers to use for each ResultPartition. Currently, it just
+     * supports ResultPartition. InputGate can be supported if necessary.
+     */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> NETWORK_MAX_OVERDRAFT_BUFFERS_PER_GATE =
+            key("taskmanager.network.memory.max-overdraft-buffers-per-gate")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            String.format(
+                                    "Number of max overdraft network buffers to use for each ResultPartition."
+                                            + " The overdraft buffers will be used when the ResultPartition cannot apply to the normal buffers from the"
+                                            + " LocalBufferPool(LocalBufferPool is unavailable), e.g: the all buffers of LocalBufferPool be applied or"
+                                            + " the number of buffers for a single channel has reached %s. When the LocalBufferPool is unavailable,"
+                                            + " the LocalBufferPool can provide some additional buffers to allow overdraft. It can effectively solve"
+                                            + " the problem that multiple output buffers are required to process a single data, causing the Task to block in the requestMemory,"
+                                            + " such as: flatMap operator, records spanning multiple buffers or a single timer generates a large amount of data."
+                                            + " It doesn't need to wait for ResultPartition is available to start Unaligned Checkpoint directly.",
+                                    NETWORK_MAX_BUFFERS_PER_CHANNEL.key()));

Review Comment:
   Hi @pnowojski , thanks for your review, I updated. 
   
   But I still use ResultPartition in the first sentence, because it has multiple ResultPartitions when operator has multiple downstream operators. And each ResultPartition has its own overdraft buffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19499:
URL: https://github.com/apache/flink/pull/19499#issuecomment-1100901336

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9056d0439b007c8308ae07a274730395268b0942",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9056d0439b007c8308ae07a274730395268b0942",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9056d0439b007c8308ae07a274730395268b0942 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] akalash commented on a diff in pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
akalash commented on code in PR #19499:
URL: https://github.com/apache/flink/pull/19499#discussion_r858777920


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -375,11 +399,14 @@ private MemorySegment requestMemorySegment(int targetChannel) {
             // target channel over quota; do not return a segment
             if (targetChannel != UNKNOWN_CHANNEL
                     && subpartitionBuffersCount[targetChannel] >= maxBuffersPerChannel) {
-                return null;
+                segment = requestOverdraftMemorySegmentFromGlobal(targetChannel);
+            } else {
+                segment =
+                        availableMemorySegments.isEmpty()
+                                ? requestOverdraftMemorySegmentFromGlobal(targetChannel)

Review Comment:
   It is up to you but in my opinion, `requestOverdraftMemorySegmentFromGlobal` which is used in `if` and `else` blocks looks weird. Don't you think that it is better just update `if` condition:
   ```
   if (targetChannel != UNKNOWN_CHANNEL
                               && subpartitionBuffersCount[targetChannel] >= maxBuffersPerChannel
                       || availableMemorySegments.isEmpty()) {
                   segment = requestOverdraftMemorySegmentFromGlobal(targetChannel);
               } else {
                   segment = availableMemorySegments.poll();
               }
   ```
   
   I also don't really sure that we should use overdraft if we still have `availableMemorySegments`. Perhaps we just need to ignore `maxBuffersPerChannel`:
   
   ```
   if (availableMemorySegments.isEmpty()) {
                   segment = requestOverdraftMemorySegmentFromGlobal(targetChannel);
               } else {
                   segment = availableMemorySegments.poll();
               }
   ```
   But I don't know how legal it is. I need to think ...
   



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -273,6 +273,26 @@ public class NettyShuffleEnvironmentOptions {
                                     + " case of data skew and high number of configured floating buffers. This limit is not strictly guaranteed,"
                                     + " and can be ignored by things like flatMap operators, records spanning multiple buffers or single timer"
                                     + " producing large amount of data.");
+    /**
+     * Number of max overdraft network buffers to use for each ResultPartition. Currently, it just
+     * supports ResultPartition. InputGate can be supported if necessary.

Review Comment:
   Too much-repeated information. `Number of max overdraft network buffers to use for each ResultPartition.` is enough, in my opinion, other sentences just repeat it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -430,6 +457,25 @@ private boolean requestMemorySegmentFromGlobal() {
         return false;
     }
 
+    private MemorySegment requestOverdraftMemorySegmentFromGlobal(int targetChannel) {
+        assert Thread.holdsLock(availableMemorySegments);
+
+        if (numberOfRequestedOverdraftMemorySegments >= maxOverdraftBuffersPerGate
+                || targetChannel == UNKNOWN_CHANNEL) {

Review Comment:
   Why we don't use overdraft for UNKNOWN_CHANNEL?



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -273,6 +273,26 @@ public class NettyShuffleEnvironmentOptions {
                                     + " case of data skew and high number of configured floating buffers. This limit is not strictly guaranteed,"
                                     + " and can be ignored by things like flatMap operators, records spanning multiple buffers or single timer"
                                     + " producing large amount of data.");
+    /**
+     * Number of max overdraft network buffers to use for each ResultPartition. Currently, it just
+     * supports ResultPartition. InputGate can be supported if necessary.
+     */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> NETWORK_MAX_OVERDRAFT_BUFFERS_PER_GATE =
+            key("taskmanager.network.memory.max-overdraft-buffers-per-gate")
+                    .intType()
+                    .defaultValue(10)

Review Comment:
   Default value should be discussed later.



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -273,6 +273,26 @@ public class NettyShuffleEnvironmentOptions {
                                     + " case of data skew and high number of configured floating buffers. This limit is not strictly guaranteed,"
                                     + " and can be ignored by things like flatMap operators, records spanning multiple buffers or single timer"
                                     + " producing large amount of data.");
+    /**
+     * Number of max overdraft network buffers to use for each ResultPartition. Currently, it just
+     * supports ResultPartition. InputGate can be supported if necessary.
+     */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> NETWORK_MAX_OVERDRAFT_BUFFERS_PER_GATE =
+            key("taskmanager.network.memory.max-overdraft-buffers-per-gate")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            String.format(
+                                    "Number of max overdraft network buffers to use for each ResultPartition."
+                                            + " The overdraft buffers will be used when the ResultPartition cannot apply to the normal buffers from the"
+                                            + " LocalBufferPool(LocalBufferPool is unavailable), e.g: the all buffers of LocalBufferPool be applied or"
+                                            + " the number of buffers for a single channel has reached %s. When the LocalBufferPool is unavailable,"
+                                            + " the LocalBufferPool can provide some additional buffers to allow overdraft. It can effectively solve"
+                                            + " the problem that multiple output buffers are required to process a single data, causing the Task to block in the requestMemory,"
+                                            + " such as: flatMap operator, records spanning multiple buffers or a single timer generates a large amount of data."
+                                            + " It doesn't need to wait for ResultPartition is available to start Unaligned Checkpoint directly.",
+                                    NETWORK_MAX_BUFFERS_PER_CHANNEL.key()));

Review Comment:
   I am not really sure that we should mention LocalBufferPool(we need to check another configuration). It is user documentation we should just simply explain how and when this configuration can help the user.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -543,6 +543,164 @@ public void testConsistentAvailability() throws Exception {
         }
     }
 
+    @Test
+    public void testOverdraftBuffer() throws IOException, InterruptedException {

Review Comment:
   Why do you keep all scenarios in one test? It looks too big and difficult to read. Can you please, split this test into several tests with the descriptive names?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -472,18 +518,22 @@ private void onGlobalPoolAvailable() {
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;
+        return !availableMemorySegments.isEmpty()
+                && unavailableSubpartitionsCount == 0
+                && numberOfRequestedOverdraftMemorySegments == 0;

Review Comment:
   I see a lot of pattern `unavailableSubpartitionsCount == 0 && umberOfRequestedOverdraftMemorySegments == 0` I suppose we need to extract it to separated method in order to avoid misuse in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] akalash commented on a diff in pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
akalash commented on code in PR #19499:
URL: https://github.com/apache/flink/pull/19499#discussion_r894429639


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -676,7 +726,9 @@ private void returnExcessMemorySegments() {
     }
 
     private boolean hasExcessBuffers() {
-        return numberOfRequestedMemorySegments > currentPoolSize;
+        return numberOfRequestedOverdraftMemorySegments > 0
+                || numberOfRequestedMemorySegments + numberOfRequestedOverdraftMemorySegments
+                        > currentPoolSize;

Review Comment:
   I think you overcomplicated the condition I believe this is an easier equivalent:
   ```
   private boolean hasExcessBuffers() {
           return numberOfRequestedOverdraftMemorySegments > 0
                   || numberOfRequestedMemorySegments > currentPoolSize;
       }
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -424,6 +452,24 @@ private boolean requestMemorySegmentFromGlobal() {
         return false;
     }
 
+    private MemorySegment requestOverdraftMemorySegmentFromGlobal(int targetChannel) {

Review Comment:
   It looks like `targetChannel` can be removed here since there is no usage of this parameter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #19499:
URL: https://github.com/apache/flink/pull/19499#discussion_r901594695


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -298,7 +318,11 @@ public int getNumBuffers() {
 
     @Override
     public int bestEffortGetNumOfUsedBuffers() {
-        return Math.max(0, numberOfRequestedMemorySegments - availableMemorySegments.size());
+        return Math.max(
+                0,
+                numberOfRequestedMemorySegments
+                        + numberOfRequestedOverdraftMemorySegments
+                        - availableMemorySegments.size());

Review Comment:
   This doesn't seem to be correct in combination of `LocalBufferPool#getNumBuffers`. Please take a look at how are those methods being used in `OutputBufferPoolUsageGauge#getValue`
   
   Unless you intend to report that the pool usage was > 100% if overdraft buffers are being used? If so, I think it should be documented in the `docs/content/docs/ops/metrics.md` and `docs/content.zh/docs/ops/metrics.md`.
   
   Either way, is this being tested somewhere?



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -273,6 +273,26 @@ public class NettyShuffleEnvironmentOptions {
                                     + " case of data skew and high number of configured floating buffers. This limit is not strictly guaranteed,"
                                     + " and can be ignored by things like flatMap operators, records spanning multiple buffers or single timer"
                                     + " producing large amount of data.");
+    /**
+     * Number of max overdraft network buffers to use for each ResultPartition. Currently, it just
+     * supports ResultPartition. InputGate can be supported if necessary.
+     */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> NETWORK_MAX_OVERDRAFT_BUFFERS_PER_GATE =
+            key("taskmanager.network.memory.max-overdraft-buffers-per-gate")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            String.format(
+                                    "Number of max overdraft network buffers to use for each ResultPartition."
+                                            + " The overdraft buffers will be used when the ResultPartition cannot apply to the normal buffers from the"
+                                            + " LocalBufferPool(LocalBufferPool is unavailable), e.g: the all buffers of LocalBufferPool be applied or"
+                                            + " the number of buffers for a single channel has reached %s. When the LocalBufferPool is unavailable,"
+                                            + " the LocalBufferPool can provide some additional buffers to allow overdraft. It can effectively solve"
+                                            + " the problem that multiple output buffers are required to process a single data, causing the Task to block in the requestMemory,"
+                                            + " such as: flatMap operator, records spanning multiple buffers or a single timer generates a large amount of data."
+                                            + " It doesn't need to wait for ResultPartition is available to start Unaligned Checkpoint directly.",
+                                    NETWORK_MAX_BUFFERS_PER_CHANNEL.key()));

Review Comment:
   I agree this is a bit too technical. What about:
   > Number of max overdraft network buffers to use for each subtask
   >
   > The overdraft buffers will be used when the subtask cannot apply for the normal buffers due to back pressure,
   > while subtask is performing an action that can not be interrupted in the middle, like serializing a large record,
   > flatMap operator producing multiple records for one single input record or processing time timer producing large output.
   > In situations like that system will allow subtask to request overdraft buffers, so that the subtask can finish such
   > uninterruptible action, without blocking unaligned checkpoints for long period of time. Overdraft buffers
   > are provided on best effort basis only if the system has some unused buffers available. Subtask that has used
   > overdraft buffers won't be allowed to process any more records until the overdraft buffers are returned to the pool.
   
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #19499:
URL: https://github.com/apache/flink/pull/19499#issuecomment-1160637227

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org