You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/09/15 02:03:59 UTC

[flink] branch master updated: [FLINK-29299][network] Fix the network memory size calculation issue in fine-grained resource mode

This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d3513d98953 [FLINK-29299][network] Fix the network memory size calculation issue in fine-grained resource mode
d3513d98953 is described below

commit d3513d98953b0922e3dc753ef90806ed4e264926
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Wed Sep 14 15:31:26 2022 +0800

    [FLINK-29299][network] Fix the network memory size calculation issue in fine-grained resource mode
    
    This closes #20834.
---
 .../SsgNetworkMemoryCalculationUtils.java          | 14 ++--
 .../flink/runtime/shuffle/NettyShuffleMaster.java  |  2 +-
 .../shuffle/TaskInputsOutputsDescriptor.java       | 13 +++-
 .../SsgNetworkMemoryCalculationUtilsTest.java      | 83 +++++++++++++++-------
 4 files changed, 81 insertions(+), 31 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
index 1506152fa38..0fbc58c3491 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
@@ -109,7 +109,7 @@ public class SsgNetworkMemoryCalculationUtils {
         Map<IntermediateDataSetID, ResultPartitionType> partitionTypes = getPartitionTypes(jv);
 
         return TaskInputsOutputsDescriptor.from(
-                maxInputChannelNums, maxSubpartitionNums, partitionTypes);
+                jv.getNumberOfInputs(), maxInputChannelNums, maxSubpartitionNums, partitionTypes);
     }
 
     private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNums(
@@ -130,7 +130,7 @@ public class SsgNetworkMemoryCalculationUtils {
                             ejv.getParallelism(),
                             consumedResult.getNumberOfAssignedPartitions(),
                             inputEdge.getDistributionPattern());
-            ret.put(consumedResult.getId(), maxNum);
+            ret.merge(consumedResult.getId(), maxNum, Integer::sum);
         }
 
         return ret;
@@ -177,6 +177,8 @@ public class SsgNetworkMemoryCalculationUtils {
         Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
 
         for (ExecutionVertex vertex : ejv.getTaskVertices()) {
+            Map<IntermediateDataSetID, Integer> tmp = new HashMap<>();
+
             for (ConsumedPartitionGroup partitionGroup : vertex.getAllConsumedPartitionGroups()) {
 
                 IntermediateResultPartition resultPartition =
@@ -187,10 +189,14 @@ public class SsgNetworkMemoryCalculationUtils {
                                 resultPartition,
                                 vertex.getParallelSubtaskIndex());
 
-                ret.merge(
+                tmp.merge(
                         partitionGroup.getIntermediateDataSetID(),
                         subpartitionIndexRange.size() * partitionGroup.size(),
-                        Integer::max);
+                        Integer::sum);
+            }
+
+            for (Map.Entry<IntermediateDataSetID, Integer> entry : tmp.entrySet()) {
+                ret.merge(entry.getKey(), entry.getValue(), Integer::max);
             }
         }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
index 53ea1e73cd8..59f83c73bf9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
@@ -106,7 +106,7 @@ public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor>
 
         int numTotalInputChannels =
                 desc.getInputChannelNums().values().stream().mapToInt(Integer::intValue).sum();
-        int numTotalInputGates = desc.getInputChannelNums().size();
+        int numTotalInputGates = desc.getInputGateNums();
 
         int numRequiredNetworkBuffers =
                 NettyShuffleUtils.computeNetworkBuffersForAnnouncing(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/TaskInputsOutputsDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/TaskInputsOutputsDescriptor.java
index 064543cd852..1cd5614e413 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/TaskInputsOutputsDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/TaskInputsOutputsDescriptor.java
@@ -29,6 +29,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /** Describes inputs and outputs information of a task. */
 public class TaskInputsOutputsDescriptor {
 
+    // Number of input gates
+    private final int inputGateNums;
+
     // Number of input channels per dataSet.
     private final Map<IntermediateDataSetID, Integer> inputChannelNums;
 
@@ -39,6 +42,7 @@ public class TaskInputsOutputsDescriptor {
     private final Map<IntermediateDataSetID, ResultPartitionType> partitionTypes;
 
     private TaskInputsOutputsDescriptor(
+            int inputGateNums,
             Map<IntermediateDataSetID, Integer> inputChannelNums,
             Map<IntermediateDataSetID, Integer> subpartitionNums,
             Map<IntermediateDataSetID, ResultPartitionType> partitionTypes) {
@@ -47,11 +51,16 @@ public class TaskInputsOutputsDescriptor {
         checkNotNull(subpartitionNums);
         checkNotNull(partitionTypes);
 
+        this.inputGateNums = inputGateNums;
         this.inputChannelNums = inputChannelNums;
         this.subpartitionNums = subpartitionNums;
         this.partitionTypes = partitionTypes;
     }
 
+    public int getInputGateNums() {
+        return inputGateNums;
+    }
+
     public Map<IntermediateDataSetID, Integer> getInputChannelNums() {
         return Collections.unmodifiableMap(inputChannelNums);
     }
@@ -65,10 +74,12 @@ public class TaskInputsOutputsDescriptor {
     }
 
     public static TaskInputsOutputsDescriptor from(
+            int inputGateNums,
             Map<IntermediateDataSetID, Integer> inputChannelNums,
             Map<IntermediateDataSetID, Integer> subpartitionNums,
             Map<IntermediateDataSetID, ResultPartitionType> partitionTypes) {
 
-        return new TaskInputsOutputsDescriptor(inputChannelNums, subpartitionNums, partitionTypes);
+        return new TaskInputsOutputsDescriptor(
+                inputGateNums, inputChannelNums, subpartitionNums, partitionTypes);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
index a88566e6161..b81824b89bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
@@ -70,6 +70,39 @@ public class SsgNetworkMemoryCalculationUtilsTest {
 
     @Test
     public void testGenerateEnrichedResourceProfile() throws Exception {
+        // 1. for the first source vertex, no input channel, 1 or 2 subpartitions for point-wise
+        // output edge (the max is 2)
+        // 2. for the second map vertex, 1 input channel for point-wise edge, 2 result partitions
+        // for 2 all-to-all output edges (no partition reuse for pipelined shuffle), each with 6
+        // subpartitions
+        // 3. for the third sink vertex, 10 input channels for two result partitions, each with 5
+        // channels, no shuffle output
+        testGenerateEnrichedResourceProfile(
+                ResultPartitionType.PIPELINED,
+                new MemorySize(
+                        TestShuffleMaster.computeRequiredShuffleMemoryBytes(0, 2)
+                                + TestShuffleMaster.computeRequiredShuffleMemoryBytes(1, 12)),
+                new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(10, 0)));
+
+        // 1. for the first source vertex, no input channel, 1 or 2 subpartitions for point-wise
+        // output edge (the max is 2)
+        // 2. for the second map vertex, 1 input channel for point-wise edge, 1 result partitions (6
+        // subpartitions) for 2 all-to-all output edges (partition reused for blocking shuffle)
+        // 3. for the third sink vertex, 10 input channels for 2 all-to-all input edges consuming 1
+        // result partition, each with 5 channels, no shuffle output
+        testGenerateEnrichedResourceProfile(
+                ResultPartitionType.BLOCKING,
+                new MemorySize(
+                        TestShuffleMaster.computeRequiredShuffleMemoryBytes(0, 2)
+                                + TestShuffleMaster.computeRequiredShuffleMemoryBytes(1, 6)),
+                new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(10, 0)));
+    }
+
+    private void testGenerateEnrichedResourceProfile(
+            ResultPartitionType resultPartitionType,
+            MemorySize group0MemorySize,
+            MemorySize group1MemorySize)
+            throws Exception {
 
         SlotSharingGroup slotSharingGroup0 = new SlotSharingGroup();
         slotSharingGroup0.setResourceProfile(DEFAULT_RESOURCE);
@@ -78,16 +111,11 @@ public class SsgNetworkMemoryCalculationUtilsTest {
         slotSharingGroup1.setResourceProfile(DEFAULT_RESOURCE);
 
         createExecutionGraphAndEnrichNetworkMemory(
-                Arrays.asList(slotSharingGroup0, slotSharingGroup0, slotSharingGroup1));
+                Arrays.asList(slotSharingGroup0, slotSharingGroup0, slotSharingGroup1),
+                resultPartitionType);
 
-        assertEquals(
-                new MemorySize(
-                        TestShuffleMaster.computeRequiredShuffleMemoryBytes(0, 2)
-                                + TestShuffleMaster.computeRequiredShuffleMemoryBytes(1, 6)),
-                slotSharingGroup0.getResourceProfile().getNetworkMemory());
-        assertEquals(
-                new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(5, 0)),
-                slotSharingGroup1.getResourceProfile().getNetworkMemory());
+        assertEquals(group0MemorySize, slotSharingGroup0.getResourceProfile().getNetworkMemory());
+        assertEquals(group1MemorySize, slotSharingGroup1.getResourceProfile().getNetworkMemory());
     }
 
     @Test
@@ -99,7 +127,8 @@ public class SsgNetworkMemoryCalculationUtilsTest {
         slotSharingGroup1.setResourceProfile(ResourceProfile.UNKNOWN);
 
         createExecutionGraphAndEnrichNetworkMemory(
-                Arrays.asList(slotSharingGroup0, slotSharingGroup0, slotSharingGroup1));
+                Arrays.asList(slotSharingGroup0, slotSharingGroup0, slotSharingGroup1),
+                ResultPartitionType.PIPELINED);
 
         assertEquals(ResourceProfile.UNKNOWN, slotSharingGroup0.getResourceProfile());
         assertEquals(ResourceProfile.UNKNOWN, slotSharingGroup1.getResourceProfile());
@@ -138,7 +167,7 @@ public class SsgNetworkMemoryCalculationUtilsTest {
                         new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(0, 5)),
                         new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(5, 20)),
                         new MemorySize(
-                                TestShuffleMaster.computeRequiredShuffleMemoryBytes(15, 0))));
+                                TestShuffleMaster.computeRequiredShuffleMemoryBytes(30, 0))));
     }
 
     private void triggerComputeNumOfSubpartitions(IntermediateResult result) {
@@ -213,7 +242,9 @@ public class SsgNetworkMemoryCalculationUtilsTest {
             final List<SlotSharingGroup> slotSharingGroups, int defaultMaxParallelism)
             throws Exception {
 
-        JobGraph jobGraph = createBatchGraph(slotSharingGroups, Arrays.asList(4, -1, -1));
+        JobGraph jobGraph =
+                createJobGraph(
+                        slotSharingGroups, Arrays.asList(4, -1, -1), ResultPartitionType.BLOCKING);
 
         final VertexParallelismStore vertexParallelismStore =
                 AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(
@@ -227,23 +258,16 @@ public class SsgNetworkMemoryCalculationUtilsTest {
     }
 
     private void createExecutionGraphAndEnrichNetworkMemory(
-            final List<SlotSharingGroup> slotSharingGroups) throws Exception {
+            final List<SlotSharingGroup> slotSharingGroups, ResultPartitionType resultPartitionType)
+            throws Exception {
         TestingDefaultExecutionGraphBuilder.newBuilder()
-                .setJobGraph(createStreamingGraph(slotSharingGroups, Arrays.asList(4, 5, 6)))
+                .setJobGraph(
+                        createJobGraph(
+                                slotSharingGroups, Arrays.asList(4, 5, 6), resultPartitionType))
                 .setShuffleMaster(SHUFFLE_MASTER)
                 .build(EXECUTOR_RESOURCE.getExecutor());
     }
 
-    private static JobGraph createStreamingGraph(
-            final List<SlotSharingGroup> slotSharingGroups, List<Integer> parallelisms) {
-        return createJobGraph(slotSharingGroups, parallelisms, ResultPartitionType.PIPELINED);
-    }
-
-    private static JobGraph createBatchGraph(
-            final List<SlotSharingGroup> slotSharingGroups, List<Integer> parallelisms) {
-        return createJobGraph(slotSharingGroups, parallelisms, ResultPartitionType.BLOCKING);
-    }
-
     private static JobGraph createJobGraph(
             final List<SlotSharingGroup> slotSharingGroups,
             List<Integer> parallelisms,
@@ -268,7 +292,16 @@ public class SsgNetworkMemoryCalculationUtilsTest {
         sink.setSlotSharingGroup(slotSharingGroups.get(2));
 
         map.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, resultPartitionType);
-        sink.connectNewDataSetAsInput(map, DistributionPattern.ALL_TO_ALL, resultPartitionType);
+        if (resultPartitionType == ResultPartitionType.BLOCKING) {
+            IntermediateDataSetID dataSetId = new IntermediateDataSetID();
+            sink.connectNewDataSetAsInput(
+                    map, DistributionPattern.ALL_TO_ALL, resultPartitionType, dataSetId, false);
+            sink.connectNewDataSetAsInput(
+                    map, DistributionPattern.ALL_TO_ALL, resultPartitionType, dataSetId, false);
+        } else {
+            sink.connectNewDataSetAsInput(map, DistributionPattern.ALL_TO_ALL, resultPartitionType);
+            sink.connectNewDataSetAsInput(map, DistributionPattern.ALL_TO_ALL, resultPartitionType);
+        }
 
         if (!resultPartitionType.isBlockingOrBlockingPersistentResultPartition()) {
             return JobGraphTestUtils.streamingJobGraph(source, map, sink);