You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/17 08:35:57 UTC

[GitHub] [flink] wanglijie95 opened a new pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

wanglijie95 opened a new pull request #18376:
URL: https://github.com/apache/flink/pull/18376


   ## What is the purpose of the change
   Support calcuate network memory for dynamic graph. 
   
   ## Brief change log
   a415c9901fe5addb7e042b2ea760b2de95c736ca Support calcuate network memory for dynamic graph.
   
   
   ## Verifying this change
   Unit tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (**no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**)
     - The serializers: (**no**)
     - The runtime per-record code paths (performance sensitive): (**no**)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**no**)
     - The S3 file system connector: (**no**)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**no**)
     - If yes, how is the feature documented? (**not applicable**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #18376:
URL: https://github.com/apache/flink/pull/18376#discussion_r790594456



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
##########
@@ -148,6 +167,80 @@ private static TaskInputsOutputsDescriptor buildTaskInputsOutputsDescriptor(
         return ret;
     }
 
+    private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        final Map<Tuple2<IntermediateDataSetID, ExecutionVertexID>, ConsumedPartitionGroup>
+                consumedPartitionGroups = new HashMap<>();
+        for (ExecutionVertex vertex : ejv.getTaskVertices()) {
+            for (ConsumedPartitionGroup partitionGroup : vertex.getAllConsumedPartitionGroups()) {
+                consumedPartitionGroups.put(
+                        new Tuple2<>(partitionGroup.getIntermediateDataSetID(), vertex.getID()),
+                        partitionGroup);
+            }
+        }
+
+        for (IntermediateResult consumedResult : ejv.getInputs()) {
+            ret.put(
+                    consumedResult.getId(),
+                    getMaxInputChannelNumForResult(
+                            ejv,
+                            consumedResult.getId(),
+                            (resultId, vertexId) ->
+                                    consumedPartitionGroups.get(new Tuple2<>(resultId, vertexId))));
+        }
+
+        return ret;
+    }
+
+    private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        for (IntermediateResult intermediateResult : ejv.getProducedDataSets()) {
+            final int maxNum =
+                    Arrays.stream(intermediateResult.getPartitions())
+                            .map(IntermediateResultPartition::getNumberOfSubpartitions)
+                            .reduce(0, Integer::max);
+            ret.put(intermediateResult.getId(), maxNum);
+        }
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    static int getMaxInputChannelNumForResult(

Review comment:
       I would change the method to `Map< IntermediateDataSetID, Integer> getMaxInputChannelNumForJobVertex(ExecutionJobVertex)`. In this way, we do not need to pre-build the map for retriever and `testGetMaxInputChannelNumForResult()` can be simplified.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a change in pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on a change in pull request #18376:
URL: https://github.com/apache/flink/pull/18376#discussion_r790694952



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
##########
@@ -148,6 +167,80 @@ private static TaskInputsOutputsDescriptor buildTaskInputsOutputsDescriptor(
         return ret;
     }
 
+    private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        final Map<Tuple2<IntermediateDataSetID, ExecutionVertexID>, ConsumedPartitionGroup>
+                consumedPartitionGroups = new HashMap<>();
+        for (ExecutionVertex vertex : ejv.getTaskVertices()) {
+            for (ConsumedPartitionGroup partitionGroup : vertex.getAllConsumedPartitionGroups()) {
+                consumedPartitionGroups.put(
+                        new Tuple2<>(partitionGroup.getIntermediateDataSetID(), vertex.getID()),
+                        partitionGroup);
+            }
+        }
+
+        for (IntermediateResult consumedResult : ejv.getInputs()) {
+            ret.put(
+                    consumedResult.getId(),
+                    getMaxInputChannelNumForResult(
+                            ejv,
+                            consumedResult.getId(),
+                            (resultId, vertexId) ->
+                                    consumedPartitionGroups.get(new Tuple2<>(resultId, vertexId))));
+        }
+
+        return ret;
+    }
+
+    private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        for (IntermediateResult intermediateResult : ejv.getProducedDataSets()) {
+            final int maxNum =
+                    Arrays.stream(intermediateResult.getPartitions())
+                            .map(IntermediateResultPartition::getNumberOfSubpartitions)
+                            .reduce(0, Integer::max);
+            ret.put(intermediateResult.getId(), maxNum);
+        }
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    static int getMaxInputChannelNumForResult(

Review comment:
       Do you mean to merge `getMaxInputChannelNumForResult ` into `getMaxInputChannelNumsForDynamicGraph` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18376:
URL: https://github.com/apache/flink/pull/18376#issuecomment-1014266348


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29529",
       "triggerID" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29992",
       "triggerID" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "320757eaf30cae2a2b95c06e559c37e068120ef4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "320757eaf30cae2a2b95c06e559c37e068120ef4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 369e3ea1432e9b033e5660642ab0f86b03ab4d79 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29992) 
   * 320757eaf30cae2a2b95c06e559c37e068120ef4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18376:
URL: https://github.com/apache/flink/pull/18376#issuecomment-1014266348


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29529",
       "triggerID" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a415c9901fe5addb7e042b2ea760b2de95c736ca Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29529) 
   * 369e3ea1432e9b033e5660642ab0f86b03ab4d79 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18376:
URL: https://github.com/apache/flink/pull/18376#issuecomment-1014266348






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a change in pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on a change in pull request #18376:
URL: https://github.com/apache/flink/pull/18376#discussion_r790425616



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
##########
@@ -148,6 +161,78 @@ private static TaskInputsOutputsDescriptor buildTaskInputsOutputsDescriptor(
         return ret;
     }
 
+    private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+        for (IntermediateResult consumedResult : ejv.getInputs()) {
+            ret.put(consumedResult.getId(), getMaxInputChannelNumForResult(ejv, consumedResult));
+        }
+
+        return ret;
+    }
+
+    private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        for (IntermediateResult intermediateResult : ejv.getProducedDataSets()) {
+            final int maxNum =
+                    Arrays.stream(intermediateResult.getPartitions())
+                            .map(IntermediateResultPartition::getNumberOfSubpartitions)
+                            .reduce(0, Integer::max);
+            ret.put(intermediateResult.getId(), maxNum);
+        }
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    static int getMaxInputChannelNumForResult(
+            ExecutionJobVertex ejv, IntermediateResult consumedResult) {
+        DistributionPattern distributionPattern = consumedResult.getConsumingDistributionPattern();
+
+        if (distributionPattern == DistributionPattern.ALL_TO_ALL) {
+            int numChannelsToConsumePerPartition =
+                    getMaxNumOfChannelsForConsuming(consumedResult.getPartitions()[0]);
+            int numConsumedPartitions = consumedResult.getNumberOfAssignedPartitions();
+            return numChannelsToConsumePerPartition * numConsumedPartitions;
+
+        } else if (distributionPattern == DistributionPattern.POINTWISE) {
+            int numPartitions = consumedResult.getNumberOfAssignedPartitions();
+            int numConsumers = ejv.getParallelism();
+            // when using dynamic graph, all partitions have the same number of subpartitions

Review comment:
       > e.g. if later someone tried to optimized `IntermediateResultPartition#computeNumberOfSubpartitions()` to avoid creating extra subpartitions if the parallelism of the downstream vertex is known already, this assumption may break. However, he/she may not be aware of the changes here the assumption here may stay unchanged.
   
   You are right. I will change it to use TaskDeploymentDescriptorFactory#computeConsumedSubpartitionRange to compute subpartition range for each consumer, and then calculate the max number of channels according to the subpartition range.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #18376:
URL: https://github.com/apache/flink/pull/18376#discussion_r790281527



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
##########
@@ -148,6 +161,78 @@ private static TaskInputsOutputsDescriptor buildTaskInputsOutputsDescriptor(
         return ret;
     }
 
+    private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+        for (IntermediateResult consumedResult : ejv.getInputs()) {
+            ret.put(consumedResult.getId(), getMaxInputChannelNumForResult(ejv, consumedResult));
+        }
+
+        return ret;
+    }
+
+    private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        for (IntermediateResult intermediateResult : ejv.getProducedDataSets()) {
+            final int maxNum =
+                    Arrays.stream(intermediateResult.getPartitions())
+                            .map(IntermediateResultPartition::getNumberOfSubpartitions)
+                            .reduce(0, Integer::max);
+            ret.put(intermediateResult.getId(), maxNum);
+        }
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    static int getMaxInputChannelNumForResult(
+            ExecutionJobVertex ejv, IntermediateResult consumedResult) {
+        DistributionPattern distributionPattern = consumedResult.getConsumingDistributionPattern();
+
+        if (distributionPattern == DistributionPattern.ALL_TO_ALL) {
+            int numChannelsToConsumePerPartition =
+                    getMaxNumOfChannelsForConsuming(consumedResult.getPartitions()[0]);
+            int numConsumedPartitions = consumedResult.getNumberOfAssignedPartitions();
+            return numChannelsToConsumePerPartition * numConsumedPartitions;
+
+        } else if (distributionPattern == DistributionPattern.POINTWISE) {
+            int numPartitions = consumedResult.getNumberOfAssignedPartitions();
+            int numConsumers = ejv.getParallelism();
+            // when using dynamic graph, all partitions have the same number of subpartitions

Review comment:
       I took a look at IntermediateResultPartition again and yes you are right. But I just feel that this assumption is easy to break and easy to lost track. This is because it is more like an implicit implementation result, rather than an explicit contract. 
   
   e.g. if later someone tried to optimized `IntermediateResultPartition#computeNumberOfSubpartitions()` to avoid creating extra subpartitions if the parallelism of the downstream vertex is known already, this assumption may break. However, he/she may not be aware of the changes here the assumption here may stay unchanged.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk closed pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
zhuzhurk closed pull request #18376:
URL: https://github.com/apache/flink/pull/18376


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a change in pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on a change in pull request #18376:
URL: https://github.com/apache/flink/pull/18376#discussion_r790694952



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
##########
@@ -148,6 +167,80 @@ private static TaskInputsOutputsDescriptor buildTaskInputsOutputsDescriptor(
         return ret;
     }
 
+    private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        final Map<Tuple2<IntermediateDataSetID, ExecutionVertexID>, ConsumedPartitionGroup>
+                consumedPartitionGroups = new HashMap<>();
+        for (ExecutionVertex vertex : ejv.getTaskVertices()) {
+            for (ConsumedPartitionGroup partitionGroup : vertex.getAllConsumedPartitionGroups()) {
+                consumedPartitionGroups.put(
+                        new Tuple2<>(partitionGroup.getIntermediateDataSetID(), vertex.getID()),
+                        partitionGroup);
+            }
+        }
+
+        for (IntermediateResult consumedResult : ejv.getInputs()) {
+            ret.put(
+                    consumedResult.getId(),
+                    getMaxInputChannelNumForResult(
+                            ejv,
+                            consumedResult.getId(),
+                            (resultId, vertexId) ->
+                                    consumedPartitionGroups.get(new Tuple2<>(resultId, vertexId))));
+        }
+
+        return ret;
+    }
+
+    private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        for (IntermediateResult intermediateResult : ejv.getProducedDataSets()) {
+            final int maxNum =
+                    Arrays.stream(intermediateResult.getPartitions())
+                            .map(IntermediateResultPartition::getNumberOfSubpartitions)
+                            .reduce(0, Integer::max);
+            ret.put(intermediateResult.getId(), maxNum);
+        }
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    static int getMaxInputChannelNumForResult(

Review comment:
       Do you mean to merge `getMaxInputChannelNumForResult ` into `getMaxSubpartitionNumsForDynamicGraph` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #18376:
URL: https://github.com/apache/flink/pull/18376#discussion_r791440840



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
##########
@@ -148,6 +167,80 @@ private static TaskInputsOutputsDescriptor buildTaskInputsOutputsDescriptor(
         return ret;
     }
 
+    private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        final Map<Tuple2<IntermediateDataSetID, ExecutionVertexID>, ConsumedPartitionGroup>
+                consumedPartitionGroups = new HashMap<>();
+        for (ExecutionVertex vertex : ejv.getTaskVertices()) {
+            for (ConsumedPartitionGroup partitionGroup : vertex.getAllConsumedPartitionGroups()) {
+                consumedPartitionGroups.put(
+                        new Tuple2<>(partitionGroup.getIntermediateDataSetID(), vertex.getID()),
+                        partitionGroup);
+            }
+        }
+
+        for (IntermediateResult consumedResult : ejv.getInputs()) {
+            ret.put(
+                    consumedResult.getId(),
+                    getMaxInputChannelNumForResult(
+                            ejv,
+                            consumedResult.getId(),
+                            (resultId, vertexId) ->
+                                    consumedPartitionGroups.get(new Tuple2<>(resultId, vertexId))));
+        }
+
+        return ret;
+    }
+
+    private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        for (IntermediateResult intermediateResult : ejv.getProducedDataSets()) {
+            final int maxNum =
+                    Arrays.stream(intermediateResult.getPartitions())
+                            .map(IntermediateResultPartition::getNumberOfSubpartitions)
+                            .reduce(0, Integer::max);
+            ret.put(intermediateResult.getId(), maxNum);
+        }
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    static int getMaxInputChannelNumForResult(

Review comment:
       You are right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #18376:
URL: https://github.com/apache/flink/pull/18376#issuecomment-1014266348


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a415c9901fe5addb7e042b2ea760b2de95c736ca UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a change in pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on a change in pull request #18376:
URL: https://github.com/apache/flink/pull/18376#discussion_r789313291



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
##########
@@ -148,6 +161,78 @@ private static TaskInputsOutputsDescriptor buildTaskInputsOutputsDescriptor(
         return ret;
     }
 
+    private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+        for (IntermediateResult consumedResult : ejv.getInputs()) {
+            ret.put(consumedResult.getId(), getMaxInputChannelNumForResult(ejv, consumedResult));
+        }
+
+        return ret;
+    }
+
+    private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        for (IntermediateResult intermediateResult : ejv.getProducedDataSets()) {
+            final int maxNum =
+                    Arrays.stream(intermediateResult.getPartitions())
+                            .map(IntermediateResultPartition::getNumberOfSubpartitions)
+                            .reduce(0, Integer::max);
+            ret.put(intermediateResult.getId(), maxNum);
+        }
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    static int getMaxInputChannelNumForResult(
+            ExecutionJobVertex ejv, IntermediateResult consumedResult) {
+        DistributionPattern distributionPattern = consumedResult.getConsumingDistributionPattern();
+
+        if (distributionPattern == DistributionPattern.ALL_TO_ALL) {
+            int numChannelsToConsumePerPartition =
+                    getMaxNumOfChannelsForConsuming(consumedResult.getPartitions()[0]);
+            int numConsumedPartitions = consumedResult.getNumberOfAssignedPartitions();
+            return numChannelsToConsumePerPartition * numConsumedPartitions;
+
+        } else if (distributionPattern == DistributionPattern.POINTWISE) {
+            int numPartitions = consumedResult.getNumberOfAssignedPartitions();
+            int numConsumers = ejv.getParallelism();
+            // when using dynamic graph, all partitions have the same number of subpartitions

Review comment:
       When enable dynamic graph, the num of subpartitions will be compute by `IntermediateResultPartition#computeNumberOfMaxPossiblePartitionConsumers`.  Whether the consumer parallelism is set or not, the returned value of`computeNumberOfMaxPossiblePartitionConsumers` should be the same for different partitions. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wanglijie95 commented on pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on pull request #18376:
URL: https://github.com/apache/flink/pull/18376#issuecomment-1020983443


   Squashed commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a change in pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on a change in pull request #18376:
URL: https://github.com/apache/flink/pull/18376#discussion_r790425357



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
##########
@@ -148,6 +161,78 @@ private static TaskInputsOutputsDescriptor buildTaskInputsOutputsDescriptor(
         return ret;
     }
 
+    private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+        for (IntermediateResult consumedResult : ejv.getInputs()) {
+            ret.put(consumedResult.getId(), getMaxInputChannelNumForResult(ejv, consumedResult));
+        }
+
+        return ret;
+    }
+
+    private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        for (IntermediateResult intermediateResult : ejv.getProducedDataSets()) {
+            final int maxNum =
+                    Arrays.stream(intermediateResult.getPartitions())
+                            .map(IntermediateResultPartition::getNumberOfSubpartitions)
+                            .reduce(0, Integer::max);
+            ret.put(intermediateResult.getId(), maxNum);
+        }
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    static int getMaxInputChannelNumForResult(
+            ExecutionJobVertex ejv, IntermediateResult consumedResult) {
+        DistributionPattern distributionPattern = consumedResult.getConsumingDistributionPattern();
+
+        if (distributionPattern == DistributionPattern.ALL_TO_ALL) {
+            int numChannelsToConsumePerPartition =
+                    getMaxNumOfChannelsForConsuming(consumedResult.getPartitions()[0]);
+            int numConsumedPartitions = consumedResult.getNumberOfAssignedPartitions();
+            return numChannelsToConsumePerPartition * numConsumedPartitions;
+
+        } else if (distributionPattern == DistributionPattern.POINTWISE) {
+            int numPartitions = consumedResult.getNumberOfAssignedPartitions();
+            int numConsumers = ejv.getParallelism();
+            // when using dynamic graph, all partitions have the same number of subpartitions
+            int numOfSubpartitionsPerPartition =
+                    consumedResult.getPartitions()[0].getNumberOfSubpartitions();
+
+            if (numPartitions >= numConsumers) {
+                // multiple partitions to one consumer
+                int maxConsumedPartitionsPerConsumer =
+                        (int) Math.ceil((double) numPartitions / numConsumers);
+                return numOfSubpartitionsPerPartition * maxConsumedPartitionsPerConsumer;
+            } else {
+                // one partition to multiple consumers
+                int minConsumersPerPartition =
+                        (int) Math.floor((double) numConsumers / numPartitions);
+                return (int)
+                        Math.ceil(
+                                (double) numOfSubpartitionsPerPartition / minConsumersPerPartition);
+            }

Review comment:
       This calculation logic here is no longer needed after modification as described above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #18376:
URL: https://github.com/apache/flink/pull/18376#discussion_r791440840



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
##########
@@ -148,6 +167,80 @@ private static TaskInputsOutputsDescriptor buildTaskInputsOutputsDescriptor(
         return ret;
     }
 
+    private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        final Map<Tuple2<IntermediateDataSetID, ExecutionVertexID>, ConsumedPartitionGroup>
+                consumedPartitionGroups = new HashMap<>();
+        for (ExecutionVertex vertex : ejv.getTaskVertices()) {
+            for (ConsumedPartitionGroup partitionGroup : vertex.getAllConsumedPartitionGroups()) {
+                consumedPartitionGroups.put(
+                        new Tuple2<>(partitionGroup.getIntermediateDataSetID(), vertex.getID()),
+                        partitionGroup);
+            }
+        }
+
+        for (IntermediateResult consumedResult : ejv.getInputs()) {
+            ret.put(
+                    consumedResult.getId(),
+                    getMaxInputChannelNumForResult(
+                            ejv,
+                            consumedResult.getId(),
+                            (resultId, vertexId) ->
+                                    consumedPartitionGroups.get(new Tuple2<>(resultId, vertexId))));
+        }
+
+        return ret;
+    }
+
+    private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        for (IntermediateResult intermediateResult : ejv.getProducedDataSets()) {
+            final int maxNum =
+                    Arrays.stream(intermediateResult.getPartitions())
+                            .map(IntermediateResultPartition::getNumberOfSubpartitions)
+                            .reduce(0, Integer::max);
+            ret.put(intermediateResult.getId(), maxNum);
+        }
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    static int getMaxInputChannelNumForResult(

Review comment:
       You are right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #18376:
URL: https://github.com/apache/flink/pull/18376#discussion_r790282508



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
##########
@@ -148,6 +161,78 @@ private static TaskInputsOutputsDescriptor buildTaskInputsOutputsDescriptor(
         return ret;
     }
 
+    private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+        for (IntermediateResult consumedResult : ejv.getInputs()) {
+            ret.put(consumedResult.getId(), getMaxInputChannelNumForResult(ejv, consumedResult));
+        }
+
+        return ret;
+    }
+
+    private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        for (IntermediateResult intermediateResult : ejv.getProducedDataSets()) {
+            final int maxNum =
+                    Arrays.stream(intermediateResult.getPartitions())
+                            .map(IntermediateResultPartition::getNumberOfSubpartitions)
+                            .reduce(0, Integer::max);
+            ret.put(intermediateResult.getId(), maxNum);
+        }
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    static int getMaxInputChannelNumForResult(
+            ExecutionJobVertex ejv, IntermediateResult consumedResult) {
+        DistributionPattern distributionPattern = consumedResult.getConsumingDistributionPattern();
+
+        if (distributionPattern == DistributionPattern.ALL_TO_ALL) {
+            int numChannelsToConsumePerPartition =
+                    getMaxNumOfChannelsForConsuming(consumedResult.getPartitions()[0]);
+            int numConsumedPartitions = consumedResult.getNumberOfAssignedPartitions();
+            return numChannelsToConsumePerPartition * numConsumedPartitions;
+
+        } else if (distributionPattern == DistributionPattern.POINTWISE) {
+            int numPartitions = consumedResult.getNumberOfAssignedPartitions();
+            int numConsumers = ejv.getParallelism();
+            // when using dynamic graph, all partitions have the same number of subpartitions

Review comment:
       Another thing I'm thinking is that, can we reuse the logic of `getMaxInputChannelNums`? Because the edge connections should be the same no matter if it's a dynamic graph or not. The only difference is the subpartition range that one edge consumes. Furthermore, execution edges from the same `IntermediateResult` to the same sub-task consume the same range.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #18376:
URL: https://github.com/apache/flink/pull/18376#discussion_r790282508



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
##########
@@ -148,6 +161,78 @@ private static TaskInputsOutputsDescriptor buildTaskInputsOutputsDescriptor(
         return ret;
     }
 
+    private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+        for (IntermediateResult consumedResult : ejv.getInputs()) {
+            ret.put(consumedResult.getId(), getMaxInputChannelNumForResult(ejv, consumedResult));
+        }
+
+        return ret;
+    }
+
+    private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        for (IntermediateResult intermediateResult : ejv.getProducedDataSets()) {
+            final int maxNum =
+                    Arrays.stream(intermediateResult.getPartitions())
+                            .map(IntermediateResultPartition::getNumberOfSubpartitions)
+                            .reduce(0, Integer::max);
+            ret.put(intermediateResult.getId(), maxNum);
+        }
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    static int getMaxInputChannelNumForResult(
+            ExecutionJobVertex ejv, IntermediateResult consumedResult) {
+        DistributionPattern distributionPattern = consumedResult.getConsumingDistributionPattern();
+
+        if (distributionPattern == DistributionPattern.ALL_TO_ALL) {
+            int numChannelsToConsumePerPartition =
+                    getMaxNumOfChannelsForConsuming(consumedResult.getPartitions()[0]);
+            int numConsumedPartitions = consumedResult.getNumberOfAssignedPartitions();
+            return numChannelsToConsumePerPartition * numConsumedPartitions;
+
+        } else if (distributionPattern == DistributionPattern.POINTWISE) {
+            int numPartitions = consumedResult.getNumberOfAssignedPartitions();
+            int numConsumers = ejv.getParallelism();
+            // when using dynamic graph, all partitions have the same number of subpartitions

Review comment:
       Another thing I'm thinking is that, can we reuse the logic of `getMaxInputChannelNums`? Because the edge connections should be the same no matter if it's a dynamic graph or not. The only difference is the subpartition range that one edge consumes. Furthermore, execution edges from the same `IntermediateResult` to the same sub-task consumes the same range.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18376:
URL: https://github.com/apache/flink/pull/18376#issuecomment-1014266348


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29529",
       "triggerID" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29992",
       "triggerID" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a415c9901fe5addb7e042b2ea760b2de95c736ca Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29529) 
   * 369e3ea1432e9b033e5660642ab0f86b03ab4d79 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29992) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18376:
URL: https://github.com/apache/flink/pull/18376#issuecomment-1014266348


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29529",
       "triggerID" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29992",
       "triggerID" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "320757eaf30cae2a2b95c06e559c37e068120ef4",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30042",
       "triggerID" : "320757eaf30cae2a2b95c06e559c37e068120ef4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 320757eaf30cae2a2b95c06e559c37e068120ef4 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30042) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18376:
URL: https://github.com/apache/flink/pull/18376#issuecomment-1014266348


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29529",
       "triggerID" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29992",
       "triggerID" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 369e3ea1432e9b033e5660642ab0f86b03ab4d79 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29992) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18376:
URL: https://github.com/apache/flink/pull/18376#issuecomment-1014266348


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29529",
       "triggerID" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a415c9901fe5addb7e042b2ea760b2de95c736ca Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29529) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wanglijie95 commented on pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on pull request #18376:
URL: https://github.com/apache/flink/pull/18376#issuecomment-1020983443


   Squashed commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18376:
URL: https://github.com/apache/flink/pull/18376#issuecomment-1014266348


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29529",
       "triggerID" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29992",
       "triggerID" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "320757eaf30cae2a2b95c06e559c37e068120ef4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30042",
       "triggerID" : "320757eaf30cae2a2b95c06e559c37e068120ef4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "87627f9924e44eb40028822fc427363ea6d845a1",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30130",
       "triggerID" : "87627f9924e44eb40028822fc427363ea6d845a1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 87627f9924e44eb40028822fc427363ea6d845a1 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30130) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a change in pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on a change in pull request #18376:
URL: https://github.com/apache/flink/pull/18376#discussion_r790424936



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
##########
@@ -148,6 +161,78 @@ private static TaskInputsOutputsDescriptor buildTaskInputsOutputsDescriptor(
         return ret;
     }
 
+    private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+        for (IntermediateResult consumedResult : ejv.getInputs()) {
+            ret.put(consumedResult.getId(), getMaxInputChannelNumForResult(ejv, consumedResult));
+        }
+
+        return ret;
+    }
+
+    private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        for (IntermediateResult intermediateResult : ejv.getProducedDataSets()) {
+            final int maxNum =
+                    Arrays.stream(intermediateResult.getPartitions())
+                            .map(IntermediateResultPartition::getNumberOfSubpartitions)
+                            .reduce(0, Integer::max);
+            ret.put(intermediateResult.getId(), maxNum);
+        }
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    static int getMaxInputChannelNumForResult(
+            ExecutionJobVertex ejv, IntermediateResult consumedResult) {
+        DistributionPattern distributionPattern = consumedResult.getConsumingDistributionPattern();
+
+        if (distributionPattern == DistributionPattern.ALL_TO_ALL) {
+            int numChannelsToConsumePerPartition =
+                    getMaxNumOfChannelsForConsuming(consumedResult.getPartitions()[0]);
+            int numConsumedPartitions = consumedResult.getNumberOfAssignedPartitions();
+            return numChannelsToConsumePerPartition * numConsumedPartitions;
+
+        } else if (distributionPattern == DistributionPattern.POINTWISE) {
+            int numPartitions = consumedResult.getNumberOfAssignedPartitions();
+            int numConsumers = ejv.getParallelism();
+            // when using dynamic graph, all partitions have the same number of subpartitions

Review comment:
       You are right. I will change it to use `TaskDeploymentDescriptorFactory#computeConsumedSubpartitionRange` to compute subpartition range for each consumer, and then calculate the max number of channels according to the subpartition range.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18376:
URL: https://github.com/apache/flink/pull/18376#issuecomment-1014266348


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29529",
       "triggerID" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a415c9901fe5addb7e042b2ea760b2de95c736ca Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29529) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #18376:
URL: https://github.com/apache/flink/pull/18376#discussion_r786618871



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
##########
@@ -148,6 +161,78 @@ private static TaskInputsOutputsDescriptor buildTaskInputsOutputsDescriptor(
         return ret;
     }
 
+    private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+        for (IntermediateResult consumedResult : ejv.getInputs()) {
+            ret.put(consumedResult.getId(), getMaxInputChannelNumForResult(ejv, consumedResult));
+        }
+
+        return ret;
+    }
+
+    private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        for (IntermediateResult intermediateResult : ejv.getProducedDataSets()) {
+            final int maxNum =
+                    Arrays.stream(intermediateResult.getPartitions())
+                            .map(IntermediateResultPartition::getNumberOfSubpartitions)
+                            .reduce(0, Integer::max);
+            ret.put(intermediateResult.getId(), maxNum);
+        }
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    static int getMaxInputChannelNumForResult(
+            ExecutionJobVertex ejv, IntermediateResult consumedResult) {
+        DistributionPattern distributionPattern = consumedResult.getConsumingDistributionPattern();
+
+        if (distributionPattern == DistributionPattern.ALL_TO_ALL) {
+            int numChannelsToConsumePerPartition =
+                    getMaxNumOfChannelsForConsuming(consumedResult.getPartitions()[0]);
+            int numConsumedPartitions = consumedResult.getNumberOfAssignedPartitions();
+            return numChannelsToConsumePerPartition * numConsumedPartitions;
+
+        } else if (distributionPattern == DistributionPattern.POINTWISE) {
+            int numPartitions = consumedResult.getNumberOfAssignedPartitions();
+            int numConsumers = ejv.getParallelism();
+            // when using dynamic graph, all partitions have the same number of subpartitions

Review comment:
       this is not true for pointwise case. One case is that the consumer parallelism is set by users in a dynamic graph.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
##########
@@ -148,6 +161,78 @@ private static TaskInputsOutputsDescriptor buildTaskInputsOutputsDescriptor(
         return ret;
     }
 
+    private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+        for (IntermediateResult consumedResult : ejv.getInputs()) {
+            ret.put(consumedResult.getId(), getMaxInputChannelNumForResult(ejv, consumedResult));
+        }
+
+        return ret;
+    }
+
+    private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        for (IntermediateResult intermediateResult : ejv.getProducedDataSets()) {
+            final int maxNum =
+                    Arrays.stream(intermediateResult.getPartitions())
+                            .map(IntermediateResultPartition::getNumberOfSubpartitions)
+                            .reduce(0, Integer::max);
+            ret.put(intermediateResult.getId(), maxNum);
+        }
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    static int getMaxInputChannelNumForResult(
+            ExecutionJobVertex ejv, IntermediateResult consumedResult) {
+        DistributionPattern distributionPattern = consumedResult.getConsumingDistributionPattern();
+
+        if (distributionPattern == DistributionPattern.ALL_TO_ALL) {
+            int numChannelsToConsumePerPartition =
+                    getMaxNumOfChannelsForConsuming(consumedResult.getPartitions()[0]);
+            int numConsumedPartitions = consumedResult.getNumberOfAssignedPartitions();
+            return numChannelsToConsumePerPartition * numConsumedPartitions;
+
+        } else if (distributionPattern == DistributionPattern.POINTWISE) {
+            int numPartitions = consumedResult.getNumberOfAssignedPartitions();
+            int numConsumers = ejv.getParallelism();
+            // when using dynamic graph, all partitions have the same number of subpartitions
+            int numOfSubpartitionsPerPartition =
+                    consumedResult.getPartitions()[0].getNumberOfSubpartitions();
+
+            if (numPartitions >= numConsumers) {
+                // multiple partitions to one consumer
+                int maxConsumedPartitionsPerConsumer =
+                        (int) Math.ceil((double) numPartitions / numConsumers);
+                return numOfSubpartitionsPerPartition * maxConsumedPartitionsPerConsumer;
+            } else {
+                // one partition to multiple consumers
+                int minConsumersPerPartition =
+                        (int) Math.floor((double) numConsumers / numPartitions);
+                return (int)
+                        Math.ceil(
+                                (double) numOfSubpartitionsPerPartition / minConsumersPerPartition);
+            }

Review comment:
       Is it possible to replace this if block with `EdgeManagerBuildUtil.computeMaxEdgesToTargetExecutionVertex`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a change in pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on a change in pull request #18376:
URL: https://github.com/apache/flink/pull/18376#discussion_r790425616



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java
##########
@@ -148,6 +161,78 @@ private static TaskInputsOutputsDescriptor buildTaskInputsOutputsDescriptor(
         return ret;
     }
 
+    private static Map<IntermediateDataSetID, Integer> getMaxInputChannelNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+        for (IntermediateResult consumedResult : ejv.getInputs()) {
+            ret.put(consumedResult.getId(), getMaxInputChannelNumForResult(ejv, consumedResult));
+        }
+
+        return ret;
+    }
+
+    private static Map<IntermediateDataSetID, Integer> getMaxSubpartitionNumsForDynamicGraph(
+            ExecutionJobVertex ejv) {
+
+        Map<IntermediateDataSetID, Integer> ret = new HashMap<>();
+
+        for (IntermediateResult intermediateResult : ejv.getProducedDataSets()) {
+            final int maxNum =
+                    Arrays.stream(intermediateResult.getPartitions())
+                            .map(IntermediateResultPartition::getNumberOfSubpartitions)
+                            .reduce(0, Integer::max);
+            ret.put(intermediateResult.getId(), maxNum);
+        }
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    static int getMaxInputChannelNumForResult(
+            ExecutionJobVertex ejv, IntermediateResult consumedResult) {
+        DistributionPattern distributionPattern = consumedResult.getConsumingDistributionPattern();
+
+        if (distributionPattern == DistributionPattern.ALL_TO_ALL) {
+            int numChannelsToConsumePerPartition =
+                    getMaxNumOfChannelsForConsuming(consumedResult.getPartitions()[0]);
+            int numConsumedPartitions = consumedResult.getNumberOfAssignedPartitions();
+            return numChannelsToConsumePerPartition * numConsumedPartitions;
+
+        } else if (distributionPattern == DistributionPattern.POINTWISE) {
+            int numPartitions = consumedResult.getNumberOfAssignedPartitions();
+            int numConsumers = ejv.getParallelism();
+            // when using dynamic graph, all partitions have the same number of subpartitions

Review comment:
       > e.g. if later someone tried to optimized `IntermediateResultPartition#computeNumberOfSubpartitions()` to avoid creating extra subpartitions if the parallelism of the downstream vertex is known already, this assumption may break. However, he/she may not be aware of the changes here the assumption here may stay unchanged.
   
   You are right. I will change it to use `TaskDeploymentDescriptorFactory#computeConsumedSubpartitionRange` to compute subpartition range for each consumer, and then calculate the max number of channels according to the subpartition range.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18376:
URL: https://github.com/apache/flink/pull/18376#issuecomment-1014266348


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29529",
       "triggerID" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29992",
       "triggerID" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "320757eaf30cae2a2b95c06e559c37e068120ef4",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30042",
       "triggerID" : "320757eaf30cae2a2b95c06e559c37e068120ef4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 369e3ea1432e9b033e5660642ab0f86b03ab4d79 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29992) 
   * 320757eaf30cae2a2b95c06e559c37e068120ef4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30042) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18376:
URL: https://github.com/apache/flink/pull/18376#issuecomment-1014266348


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29529",
       "triggerID" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29992",
       "triggerID" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "320757eaf30cae2a2b95c06e559c37e068120ef4",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30042",
       "triggerID" : "320757eaf30cae2a2b95c06e559c37e068120ef4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "87627f9924e44eb40028822fc427363ea6d845a1",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30130",
       "triggerID" : "87627f9924e44eb40028822fc427363ea6d845a1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 320757eaf30cae2a2b95c06e559c37e068120ef4 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30042) 
   * 87627f9924e44eb40028822fc427363ea6d845a1 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30130) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18376:
URL: https://github.com/apache/flink/pull/18376#issuecomment-1014266348


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29529",
       "triggerID" : "a415c9901fe5addb7e042b2ea760b2de95c736ca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29992",
       "triggerID" : "369e3ea1432e9b033e5660642ab0f86b03ab4d79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "320757eaf30cae2a2b95c06e559c37e068120ef4",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30042",
       "triggerID" : "320757eaf30cae2a2b95c06e559c37e068120ef4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "87627f9924e44eb40028822fc427363ea6d845a1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "87627f9924e44eb40028822fc427363ea6d845a1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 320757eaf30cae2a2b95c06e559c37e068120ef4 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30042) 
   * 87627f9924e44eb40028822fc427363ea6d845a1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #18376: [FLINK-25668][runtime] Support calcuate network memory for dynamic graph

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #18376:
URL: https://github.com/apache/flink/pull/18376#issuecomment-1014267921


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit a415c9901fe5addb7e042b2ea760b2de95c736ca (Mon Jan 17 08:41:15 UTC 2022)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org