You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/09/15 02:03:59 UTC
[flink] branch master updated: [FLINK-29299][network] Fix the network memory size calculation issue in fine-grained resource mode
This is an automated email from the ASF dual-hosted git repository.
yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d3513d98953 [FLINK-29299][network] Fix the network memory size calculation issue in fine-grained resource mode
d3513d98953 is described below
commit d3513d98953b0922e3dc753ef90806ed4e264926
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Wed Sep 14 15:31:26 2022 +0800
[FLINK-29299][network] Fix the network memory size calculation issue in fine-grained resource mode
This closes #20834.
---
.../SsgNetworkMemoryCalculationUtils.java | 14 ++--
.../flink/runtime/shuffle/NettyShuffleMaster.java | 2 +-
.../shuffle/TaskInputsOutputsDescriptor.java | 13 +++-
.../SsgNetworkMemoryCalculationUtilsTest.java | 83 +++++++++++++++-------
4 files changed, 81 insertions(+), 31 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
index 1506152fa38..0fbc58c3491 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
@@ -109,7 +109,7 @@ public class SsgNetworkMemoryCalculationUtils {
Map<IntermediateDataSetID, ResultPartitionType> partitionTypes = getPartitionTypes(jv);
return TaskInputsOutputsDescriptor.from(
- maxInputChannelNums, maxSubpartitionNums, partitionTypes);
+ jv.getNumberOfInputs(), maxInputChannelNums, maxSubpartitionNums, partitionTypes);
}
private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNums(
@@ -130,7 +130,7 @@ public class SsgNetworkMemoryCalculationUtils {
ejv.getParallelism(),
consumedResult.getNumberOfAssignedPartitions(),
inputEdge.getDistributionPattern());
- ret.put(consumedResult.getId(), maxNum);
+ ret.merge(consumedResult.getId(), maxNum, Integer::sum);
}
return ret;
@@ -177,6 +177,8 @@ public class SsgNetworkMemoryCalculationUtils {
Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
for (ExecutionVertex vertex : ejv.getTaskVertices()) {
+ Map<IntermediateDataSetID, Integer> tmp = new HashMap<>();
+
for (ConsumedPartitionGroup partitionGroup : vertex.getAllConsumedPartitionGroups()) {
IntermediateResultPartition resultPartition =
@@ -187,10 +189,14 @@ public class SsgNetworkMemoryCalculationUtils {
resultPartition,
vertex.getParallelSubtaskIndex());
- ret.merge(
+ tmp.merge(
partitionGroup.getIntermediateDataSetID(),
subpartitionIndexRange.size() * partitionGroup.size(),
- Integer::max);
+ Integer::sum);
+ }
+
+ for (Map.Entry<IntermediateDataSetID, Integer> entry : tmp.entrySet()) {
+ ret.merge(entry.getKey(), entry.getValue(), Integer::max);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
index 53ea1e73cd8..59f83c73bf9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
@@ -106,7 +106,7 @@ public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor>
int numTotalInputChannels =
desc.getInputChannelNums().values().stream().mapToInt(Integer::intValue).sum();
- int numTotalInputGates = desc.getInputChannelNums().size();
+ int numTotalInputGates = desc.getInputGateNums();
int numRequiredNetworkBuffers =
NettyShuffleUtils.computeNetworkBuffersForAnnouncing(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/TaskInputsOutputsDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/TaskInputsOutputsDescriptor.java
index 064543cd852..1cd5614e413 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/TaskInputsOutputsDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/TaskInputsOutputsDescriptor.java
@@ -29,6 +29,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/** Describes inputs and outputs information of a task. */
public class TaskInputsOutputsDescriptor {
+ // Number of input gates
+ private final int inputGateNums;
+
// Number of input channels per dataSet.
private final Map<IntermediateDataSetID, Integer> inputChannelNums;
@@ -39,6 +42,7 @@ public class TaskInputsOutputsDescriptor {
private final Map<IntermediateDataSetID, ResultPartitionType> partitionTypes;
private TaskInputsOutputsDescriptor(
+ int inputGateNums,
Map<IntermediateDataSetID, Integer> inputChannelNums,
Map<IntermediateDataSetID, Integer> subpartitionNums,
Map<IntermediateDataSetID, ResultPartitionType> partitionTypes) {
@@ -47,11 +51,16 @@ public class TaskInputsOutputsDescriptor {
checkNotNull(subpartitionNums);
checkNotNull(partitionTypes);
+ this.inputGateNums = inputGateNums;
this.inputChannelNums = inputChannelNums;
this.subpartitionNums = subpartitionNums;
this.partitionTypes = partitionTypes;
}
+ public int getInputGateNums() {
+ return inputGateNums;
+ }
+
public Map<IntermediateDataSetID, Integer> getInputChannelNums() {
return Collections.unmodifiableMap(inputChannelNums);
}
@@ -65,10 +74,12 @@ public class TaskInputsOutputsDescriptor {
}
public static TaskInputsOutputsDescriptor from(
+ int inputGateNums,
Map<IntermediateDataSetID, Integer> inputChannelNums,
Map<IntermediateDataSetID, Integer> subpartitionNums,
Map<IntermediateDataSetID, ResultPartitionType> partitionTypes) {
- return new TaskInputsOutputsDescriptor(inputChannelNums, subpartitionNums, partitionTypes);
+ return new TaskInputsOutputsDescriptor(
+ inputGateNums, inputChannelNums, subpartitionNums, partitionTypes);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
index a88566e6161..b81824b89bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
@@ -70,6 +70,39 @@ public class SsgNetworkMemoryCalculationUtilsTest {
@Test
public void testGenerateEnrichedResourceProfile() throws Exception {
+ // 1. for the first source vertex, no input channel, 1 or 2 subpartitions for point-wise
+ // output edge (the max is 2)
+ // 2. for the second map vertex, 1 input channel for point-wise edge, 2 result partitions
+ // for 2 all-to-all output edges (no partition reuse for pipelined shuffle), each with 6
+ // subpartitions
+ // 3. for the third sink vertex, 10 input channels for two result partitions, each with 5
+ // channels, no shuffle output
+ testGenerateEnrichedResourceProfile(
+ ResultPartitionType.PIPELINED,
+ new MemorySize(
+ TestShuffleMaster.computeRequiredShuffleMemoryBytes(0, 2)
+ + TestShuffleMaster.computeRequiredShuffleMemoryBytes(1, 12)),
+ new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(10, 0)));
+
+ // 1. for the first source vertex, no input channel, 1 or 2 subpartitions for point-wise
+ // output edge (the max is 2)
+ // 2. for the second map vertex, 1 input channel for point-wise edge, 1 result partitions (6
+ // subpartitions) for 2 all-to-all output edges (partition reused for blocking shuffle)
+ // 3. for the third sink vertex, 10 input channels for 2 all-to-all input edges consuming 1
+ // result partition, each with 5 channels, no shuffle output
+ testGenerateEnrichedResourceProfile(
+ ResultPartitionType.BLOCKING,
+ new MemorySize(
+ TestShuffleMaster.computeRequiredShuffleMemoryBytes(0, 2)
+ + TestShuffleMaster.computeRequiredShuffleMemoryBytes(1, 6)),
+ new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(10, 0)));
+ }
+
+ private void testGenerateEnrichedResourceProfile(
+ ResultPartitionType resultPartitionType,
+ MemorySize group0MemorySize,
+ MemorySize group1MemorySize)
+ throws Exception {
SlotSharingGroup slotSharingGroup0 = new SlotSharingGroup();
slotSharingGroup0.setResourceProfile(DEFAULT_RESOURCE);
@@ -78,16 +111,11 @@ public class SsgNetworkMemoryCalculationUtilsTest {
slotSharingGroup1.setResourceProfile(DEFAULT_RESOURCE);
createExecutionGraphAndEnrichNetworkMemory(
- Arrays.asList(slotSharingGroup0, slotSharingGroup0, slotSharingGroup1));
+ Arrays.asList(slotSharingGroup0, slotSharingGroup0, slotSharingGroup1),
+ resultPartitionType);
- assertEquals(
- new MemorySize(
- TestShuffleMaster.computeRequiredShuffleMemoryBytes(0, 2)
- + TestShuffleMaster.computeRequiredShuffleMemoryBytes(1, 6)),
- slotSharingGroup0.getResourceProfile().getNetworkMemory());
- assertEquals(
- new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(5, 0)),
- slotSharingGroup1.getResourceProfile().getNetworkMemory());
+ assertEquals(group0MemorySize, slotSharingGroup0.getResourceProfile().getNetworkMemory());
+ assertEquals(group1MemorySize, slotSharingGroup1.getResourceProfile().getNetworkMemory());
}
@Test
@@ -99,7 +127,8 @@ public class SsgNetworkMemoryCalculationUtilsTest {
slotSharingGroup1.setResourceProfile(ResourceProfile.UNKNOWN);
createExecutionGraphAndEnrichNetworkMemory(
- Arrays.asList(slotSharingGroup0, slotSharingGroup0, slotSharingGroup1));
+ Arrays.asList(slotSharingGroup0, slotSharingGroup0, slotSharingGroup1),
+ ResultPartitionType.PIPELINED);
assertEquals(ResourceProfile.UNKNOWN, slotSharingGroup0.getResourceProfile());
assertEquals(ResourceProfile.UNKNOWN, slotSharingGroup1.getResourceProfile());
@@ -138,7 +167,7 @@ public class SsgNetworkMemoryCalculationUtilsTest {
new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(0, 5)),
new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(5, 20)),
new MemorySize(
- TestShuffleMaster.computeRequiredShuffleMemoryBytes(15, 0))));
+ TestShuffleMaster.computeRequiredShuffleMemoryBytes(30, 0))));
}
private void triggerComputeNumOfSubpartitions(IntermediateResult result) {
@@ -213,7 +242,9 @@ public class SsgNetworkMemoryCalculationUtilsTest {
final List<SlotSharingGroup> slotSharingGroups, int defaultMaxParallelism)
throws Exception {
- JobGraph jobGraph = createBatchGraph(slotSharingGroups, Arrays.asList(4, -1, -1));
+ JobGraph jobGraph =
+ createJobGraph(
+ slotSharingGroups, Arrays.asList(4, -1, -1), ResultPartitionType.BLOCKING);
final VertexParallelismStore vertexParallelismStore =
AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(
@@ -227,23 +258,16 @@ public class SsgNetworkMemoryCalculationUtilsTest {
}
private void createExecutionGraphAndEnrichNetworkMemory(
- final List<SlotSharingGroup> slotSharingGroups) throws Exception {
+ final List<SlotSharingGroup> slotSharingGroups, ResultPartitionType resultPartitionType)
+ throws Exception {
TestingDefaultExecutionGraphBuilder.newBuilder()
- .setJobGraph(createStreamingGraph(slotSharingGroups, Arrays.asList(4, 5, 6)))
+ .setJobGraph(
+ createJobGraph(
+ slotSharingGroups, Arrays.asList(4, 5, 6), resultPartitionType))
.setShuffleMaster(SHUFFLE_MASTER)
.build(EXECUTOR_RESOURCE.getExecutor());
}
- private static JobGraph createStreamingGraph(
- final List<SlotSharingGroup> slotSharingGroups, List<Integer> parallelisms) {
- return createJobGraph(slotSharingGroups, parallelisms, ResultPartitionType.PIPELINED);
- }
-
- private static JobGraph createBatchGraph(
- final List<SlotSharingGroup> slotSharingGroups, List<Integer> parallelisms) {
- return createJobGraph(slotSharingGroups, parallelisms, ResultPartitionType.BLOCKING);
- }
-
private static JobGraph createJobGraph(
final List<SlotSharingGroup> slotSharingGroups,
List<Integer> parallelisms,
@@ -268,7 +292,16 @@ public class SsgNetworkMemoryCalculationUtilsTest {
sink.setSlotSharingGroup(slotSharingGroups.get(2));
map.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, resultPartitionType);
- sink.connectNewDataSetAsInput(map, DistributionPattern.ALL_TO_ALL, resultPartitionType);
+ if (resultPartitionType == ResultPartitionType.BLOCKING) {
+ IntermediateDataSetID dataSetId = new IntermediateDataSetID();
+ sink.connectNewDataSetAsInput(
+ map, DistributionPattern.ALL_TO_ALL, resultPartitionType, dataSetId, false);
+ sink.connectNewDataSetAsInput(
+ map, DistributionPattern.ALL_TO_ALL, resultPartitionType, dataSetId, false);
+ } else {
+ sink.connectNewDataSetAsInput(map, DistributionPattern.ALL_TO_ALL, resultPartitionType);
+ sink.connectNewDataSetAsInput(map, DistributionPattern.ALL_TO_ALL, resultPartitionType);
+ }
if (!resultPartitionType.isBlockingOrBlockingPersistentResultPartition()) {
return JobGraphTestUtils.streamingJobGraph(source, map, sink);