You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/19 10:39:19 UTC

[GitHub] [flink] zhuzhurk commented on a change in pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

zhuzhurk commented on a change in pull request #18376:
URL: https://github.com/apache/flink/pull/18376#discussion_r786618871



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
##########
@@ -148,6 +161,78 @@ private static TaskInputsOutputsDescriptor buildTaskInputsOutputsDescriptor(
         return ret;
     }
 
+    private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+        for (IntermediateResult consumedResult : ejv.getInputs()) {
+            ret.put(consumedResult.getId(), getMaxInputChannelNumForResult(ejv, consumedResult));
+        }
+
+        return ret;
+    }
+
+    private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        for (IntermediateResult intermediateResult : ejv.getProducedDataSets()) {
+            final int maxNum =
+                    Arrays.stream(intermediateResult.getPartitions())
+                            .map(IntermediateResultPartition::getNumberOfSubpartitions)
+                            .reduce(0, Integer::max);
+            ret.put(intermediateResult.getId(), maxNum);
+        }
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    static int getMaxInputChannelNumForResult(
+            ExecutionJobVertex ejv, IntermediateResult consumedResult) {
+        DistributionPattern distributionPattern = consumedResult.getConsumingDistributionPattern();
+
+        if (distributionPattern == DistributionPattern.ALL_TO_ALL) {
+            int numChannelsToConsumePerPartition =
+                    getMaxNumOfChannelsForConsuming(consumedResult.getPartitions()[0]);
+            int numConsumedPartitions = consumedResult.getNumberOfAssignedPartitions();
+            return numChannelsToConsumePerPartition * numConsumedPartitions;
+
+        } else if (distributionPattern == DistributionPattern.POINTWISE) {
+            int numPartitions = consumedResult.getNumberOfAssignedPartitions();
+            int numConsumers = ejv.getParallelism();
+            // when using dynamic graph, all partitions have the same number of subpartitions

Review comment:
       this is not true for pointwise case. One case is that the consumer parallelism is set by users in a dynamic graph.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
##########
@@ -148,6 +161,78 @@ private static TaskInputsOutputsDescriptor buildTaskInputsOutputsDescriptor(
         return ret;
     }
 
+    private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+        for (IntermediateResult consumedResult : ejv.getInputs()) {
+            ret.put(consumedResult.getId(), getMaxInputChannelNumForResult(ejv, consumedResult));
+        }
+
+        return ret;
+    }
+
+    private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        for (IntermediateResult intermediateResult : ejv.getProducedDataSets()) {
+            final int maxNum =
+                    Arrays.stream(intermediateResult.getPartitions())
+                            .map(IntermediateResultPartition::getNumberOfSubpartitions)
+                            .reduce(0, Integer::max);
+            ret.put(intermediateResult.getId(), maxNum);
+        }
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    static int getMaxInputChannelNumForResult(
+            ExecutionJobVertex ejv, IntermediateResult consumedResult) {
+        DistributionPattern distributionPattern = consumedResult.getConsumingDistributionPattern();
+
+        if (distributionPattern == DistributionPattern.ALL_TO_ALL) {
+            int numChannelsToConsumePerPartition =
+                    getMaxNumOfChannelsForConsuming(consumedResult.getPartitions()[0]);
+            int numConsumedPartitions = consumedResult.getNumberOfAssignedPartitions();
+            return numChannelsToConsumePerPartition * numConsumedPartitions;
+
+        } else if (distributionPattern == DistributionPattern.POINTWISE) {
+            int numPartitions = consumedResult.getNumberOfAssignedPartitions();
+            int numConsumers = ejv.getParallelism();
+            // when using dynamic graph, all partitions have the same number of subpartitions
+            int numOfSubpartitionsPerPartition =
+                    consumedResult.getPartitions()[0].getNumberOfSubpartitions();
+
+            if (numPartitions >= numConsumers) {
+                // multiple partitions to one consumer
+                int maxConsumedPartitionsPerConsumer =
+                        (int) Math.ceil((double) numPartitions / numConsumers);
+                return numOfSubpartitionsPerPartition * maxConsumedPartitionsPerConsumer;
+            } else {
+                // one partition to multiple consumers
+                int minConsumersPerPartition =
+                        (int) Math.floor((double) numConsumers / numPartitions);
+                return (int)
+                        Math.ceil(
+                                (double) numOfSubpartitionsPerPartition / minConsumersPerPartition);
+            }

Review comment:
       Is it possible to replace this if block with `EdgeManagerBuildUtil.computeMaxEdgesToTargetExecutionVertex`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org