You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/26 14:31:50 UTC

[GitHub] [flink] akalash commented on a diff in pull request #19499: [FLINK-26762][network] Add the overdraft buffer in ResultPartition

akalash commented on code in PR #19499:
URL: https://github.com/apache/flink/pull/19499#discussion_r858777920


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -375,11 +399,14 @@ private MemorySegment requestMemorySegment(int targetChannel) {
             // target channel over quota; do not return a segment
             if (targetChannel != UNKNOWN_CHANNEL
                     && subpartitionBuffersCount[targetChannel] >= maxBuffersPerChannel) {
-                return null;
+                segment = requestOverdraftMemorySegmentFromGlobal(targetChannel);
+            } else {
+                segment =
+                        availableMemorySegments.isEmpty()
+                                ? requestOverdraftMemorySegmentFromGlobal(targetChannel)

Review Comment:
   It is up to you but in my opinion, `requestOverdraftMemorySegmentFromGlobal` which is used in `if` and `else` blocks looks weird. Don't you think that it is better just update `if` condition:
   ```
   if (targetChannel != UNKNOWN_CHANNEL
                               && subpartitionBuffersCount[targetChannel] >= maxBuffersPerChannel
                       || availableMemorySegments.isEmpty()) {
                   segment = requestOverdraftMemorySegmentFromGlobal(targetChannel);
               } else {
                   segment = availableMemorySegments.poll();
               }
   ```
   
   I also don't really sure that we should use overdraft if we still have `availableMemorySegments`. Perhaps we just need to ignore `maxBuffersPerChannel`:
   
   ```
   if (availableMemorySegments.isEmpty()) {
                   segment = requestOverdraftMemorySegmentFromGlobal(targetChannel);
               } else {
                   segment = availableMemorySegments.poll();
               }
   ```
   But I don't know how legal it is. I need to think ...
   



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -273,6 +273,26 @@ public class NettyShuffleEnvironmentOptions {
                                     + " case of data skew and high number of configured floating buffers. This limit is not strictly guaranteed,"
                                     + " and can be ignored by things like flatMap operators, records spanning multiple buffers or single timer"
                                     + " producing large amount of data.");
+    /**
+     * Number of max overdraft network buffers to use for each ResultPartition. Currently, it just
+     * supports ResultPartition. InputGate can be supported if necessary.

Review Comment:
   Too much-repeated information. `Number of max overdraft network buffers to use for each ResultPartition.` is enough, in my opinion, other sentences just repeat it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -430,6 +457,25 @@ private boolean requestMemorySegmentFromGlobal() {
         return false;
     }
 
+    private MemorySegment requestOverdraftMemorySegmentFromGlobal(int targetChannel) {
+        assert Thread.holdsLock(availableMemorySegments);
+
+        if (numberOfRequestedOverdraftMemorySegments >= maxOverdraftBuffersPerGate
+                || targetChannel == UNKNOWN_CHANNEL) {

Review Comment:
   Why we don't use overdraft for UNKNOWN_CHANNEL?



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -273,6 +273,26 @@ public class NettyShuffleEnvironmentOptions {
                                     + " case of data skew and high number of configured floating buffers. This limit is not strictly guaranteed,"
                                     + " and can be ignored by things like flatMap operators, records spanning multiple buffers or single timer"
                                     + " producing large amount of data.");
+    /**
+     * Number of max overdraft network buffers to use for each ResultPartition. Currently, it just
+     * supports ResultPartition. InputGate can be supported if necessary.
+     */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> NETWORK_MAX_OVERDRAFT_BUFFERS_PER_GATE =
+            key("taskmanager.network.memory.max-overdraft-buffers-per-gate")
+                    .intType()
+                    .defaultValue(10)

Review Comment:
   Default value should be discussed later.



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -273,6 +273,26 @@ public class NettyShuffleEnvironmentOptions {
                                     + " case of data skew and high number of configured floating buffers. This limit is not strictly guaranteed,"
                                     + " and can be ignored by things like flatMap operators, records spanning multiple buffers or single timer"
                                     + " producing large amount of data.");
+    /**
+     * Number of max overdraft network buffers to use for each ResultPartition. Currently, it just
+     * supports ResultPartition. InputGate can be supported if necessary.
+     */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> NETWORK_MAX_OVERDRAFT_BUFFERS_PER_GATE =
+            key("taskmanager.network.memory.max-overdraft-buffers-per-gate")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            String.format(
+                                    "Number of max overdraft network buffers to use for each ResultPartition."
+                                            + " The overdraft buffers will be used when the ResultPartition cannot apply to the normal buffers from the"
+                                            + " LocalBufferPool(LocalBufferPool is unavailable), e.g: the all buffers of LocalBufferPool be applied or"
+                                            + " the number of buffers for a single channel has reached %s. When the LocalBufferPool is unavailable,"
+                                            + " the LocalBufferPool can provide some additional buffers to allow overdraft. It can effectively solve"
+                                            + " the problem that multiple output buffers are required to process a single data, causing the Task to block in the requestMemory,"
+                                            + " such as: flatMap operator, records spanning multiple buffers or a single timer generates a large amount of data."
+                                            + " It doesn't need to wait for ResultPartition is available to start Unaligned Checkpoint directly.",
+                                    NETWORK_MAX_BUFFERS_PER_CHANNEL.key()));

Review Comment:
   I am not really sure that we should mention LocalBufferPool(we need to check another configuration). It is user documentation we should just simply explain how and when this configuration can help the user.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -543,6 +543,164 @@ public void testConsistentAvailability() throws Exception {
         }
     }
 
+    @Test
+    public void testOverdraftBuffer() throws IOException, InterruptedException {

Review Comment:
   Why do you keep all scenarios in one test? It looks too big and difficult to read. Can you please, split this test into several tests with the descriptive names?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -472,18 +518,22 @@ private void onGlobalPoolAvailable() {
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;
+        return !availableMemorySegments.isEmpty()
+                && unavailableSubpartitionsCount == 0
+                && numberOfRequestedOverdraftMemorySegments == 0;

Review Comment:
   I see a lot of pattern `unavailableSubpartitionsCount == 0 && umberOfRequestedOverdraftMemorySegments == 0` I suppose we need to extract it to separated method in order to avoid misuse in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org