You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/26 09:11:49 UTC

[GitHub] [flink] wanglijie95 opened a new pull request, #21162: [FLINK-29665][runtime] Support flexible subpartion range division

wanglijie95 opened a new pull request, #21162:
URL: https://github.com/apache/flink/pull/21162

   ## What is the purpose of the change
   Support flexible subpartion range division
   
   ## Verifying this change
   Add some tests
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (**no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**)
     - The serializers: (**no**)
     - The runtime per-record code paths (performance sensitive): (**no**)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**no**)
     - The S3 file system connector: (**no**)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**no**)
     - If yes, how is the feature documented? (**not applicable**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #21162: [FLINK-29665][runtime] Support flexible subpartion range division

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #21162:
URL: https://github.com/apache/flink/pull/21162#discussion_r1049598360


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SubpartitionIndexRange.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+/** This class represents the range of subpartition index. The range is inclusive. */
+public class SubpartitionIndexRange extends IndexRange {
+
+    public SubpartitionIndexRange(int startIndex, int endIndex) {

Review Comment:
   Can we just use IndexRange without introducing PartitionIndexRange/SubpartitionIndexRange?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInputInfo.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class describe the inputs(partitions and subpartitions that belong to the same intermediate
+ * result) information of a subtask.
+ */
+public class TaskInputInfo {

Review Comment:
   Compared to TaskInputInfo  & VertexInputInfo, ExecutionVertexInputInfo & JobVertexInputInfo may be easier to understand.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java:
##########
@@ -120,4 +120,6 @@ void notifySchedulerNgAboutInternalTaskFailure(
     /** Get the shuffle descriptors of the cluster partitions ordered by partition number. */
     List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
             IntermediateDataSetID intermediateResultPartition);
+
+    VertexInputInfo getVertexInputInfo(JobVertexID jobVertexId, IntermediateDataSetID resultId);

Review Comment:
   A java doc is needed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoStore.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A store contains all the {@link VertexInputInfo}s. */
+public class VertexInputInfoStore {
+
+    private final Map<JobVertexID, Map<IntermediateDataSetID, VertexInputInfo>> vertexInputInfos =
+            new HashMap<>();
+
+    /**
+     * Put a {@link VertexInputInfo}.
+     *
+     * @param jobVertexId the job vertex id
+     * @param resultId the intermediate result id
+     * @param info the {@link VertexInputInfo} to put
+     */
+    public void put(JobVertexID jobVertexId, IntermediateDataSetID resultId, VertexInputInfo info) {
+        checkNotNull(jobVertexId);
+        checkNotNull(resultId);
+        checkNotNull(info);
+
+        vertexInputInfos.compute(
+                jobVertexId,
+                (ignored, inputInfos) -> {
+                    if (inputInfos == null) {
+                        inputInfos = new HashMap<>();
+                    }
+
+                    // Note that a job vertex can have 2 inputs with the same IntermediateDataSetID

Review Comment:
   > a job vertex can have 2 inputs with the same IntermediateDataSetID
   
   What's the point of adding the comment here? Should we add it to the get method and also explain that, if a vertex has multiple job edges connecting to the same result, their distribution pattern must be the same and therefore the VertexInputInfo will be the same?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultInfo.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+public interface IntermediateResultInfo {
+    /**
+     * Get the intermediate result id.
+     *
+     * @return the intermediate result id
+     */
+    IntermediateDataSetID getResultId();
+
+    /**
+     * Whether it is a broadcast result.
+     *
+     * @return whether it is a broadcast result
+     */
+    boolean isBroadcast();
+
+    /**
+     * Whether it is a pointwise result.
+     *
+     * @return whether it is a pointwise result
+     */
+    boolean isPointwise();
+
+    /**
+     * Get number of partitions for this result.
+     *
+     * @return the number of partitions in this result
+     */
+    int getNumPartitions();
+
+    /**
+     * Get number of subpartitions for the given partition.
+     *
+     * @param partitionIndex the partition index
+     * @return the number of partitions of the partition

Review Comment:
   partitions of -> subpartitions of



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoComputationUtils.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Util to compute {@link VertexInputInfo}s for execution job vertex. */
+public class VertexInputInfoComputationUtils {
+
+    public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos(
+            ExecutionJobVertex ejv,
+            Function<IntermediateDataSetID, IntermediateResult> intermediateResultRetriever) {
+        checkState(ejv.isParallelismDecided());
+        final List<IntermediateResultInfo> intermediateResultInfos =
+                ejv.getJobVertex().getInputs().stream()
+                        .map(JobEdge::getSourceId)
+                        .map(intermediateResultRetriever)
+                        .map(IntermediateResultWrapper::new)
+                        .collect(Collectors.toList());
+        return computeVertexInputInfos(
+                ejv.getParallelism(), intermediateResultInfos, ejv.getGraph().isDynamic());
+    }
+
+    public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos(
+            int parallelism, List<IntermediateResultInfo> inputs, boolean isDynamicGraph) {
+
+        checkArgument(parallelism > 0);
+        final Map<IntermediateDataSetID, VertexInputInfo> vertexInputInfos = new LinkedHashMap<>();
+
+        for (IntermediateResultInfo input : inputs) {
+            int sourceParallelism = input.getNumPartitions();
+
+            if (input.isPointwise()) {
+                vertexInputInfos.putIfAbsent(
+                        input.getResultId(),
+                        computeVertexInputInfoForPointwise(
+                                sourceParallelism,
+                                parallelism,
+                                input::getNumSubpartitions,
+                                isDynamicGraph));
+            } else {
+                vertexInputInfos.putIfAbsent(
+                        input.getResultId(),
+                        computeVertexInputInfoForAllToAll(
+                                sourceParallelism,
+                                parallelism,
+                                input::getNumSubpartitions,
+                                isDynamicGraph,
+                                input.isBroadcast()));
+            }
+        }
+
+        return vertexInputInfos;
+    }
+
+    /**
+     * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#POINTWISE} edge. This
+     * computation algorithm will evenly distribute subpartitions to downstream subtasks according
+     * to the number of subpartitions. Different downstream subtasks consume roughly the same number
+     * of subpartitions.
+     *
+     * @param sourceCount the parallelism of upstream
+     * @param targetCount the parallelism of downstream
+     * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @return the computed {@link VertexInputInfo}
+     */
+    @VisibleForTesting
+    static VertexInputInfo computeVertexInputInfoForPointwise(
+            int sourceCount,
+            int targetCount,
+            Function<Integer, Integer> numOfSubpartitionsRetriever,
+            boolean isDynamicGraph) {
+
+        final List<TaskInputInfo> taskInputInfos = new ArrayList<>();
+
+        if (sourceCount >= targetCount) {
+            for (int index = 0; index < targetCount; index++) {
+
+                int start = index * sourceCount / targetCount;
+                int end = (index + 1) * sourceCount / targetCount;
+
+                PartitionIndexRange partitionRange = new PartitionIndexRange(start, end - 1);
+                SubpartitionIndexRange subpartitionRange =
+                        computeConsumedSubpartitionRange(
+                                index,
+                                1,
+                                numOfSubpartitionsRetriever.apply(start),
+                                isDynamicGraph,
+                                false);
+                taskInputInfos.add(new TaskInputInfo(index, partitionRange, subpartitionRange));
+            }
+        } else {
+            for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) {
+
+                int start = (partitionNum * targetCount + sourceCount - 1) / sourceCount;
+                int end = ((partitionNum + 1) * targetCount + sourceCount - 1) / sourceCount;
+                int numConsumers = end - start;
+
+                for (int i = start; i < end; i++) {
+                    PartitionIndexRange partitionRange =
+                            new PartitionIndexRange(partitionNum, partitionNum);
+                    SubpartitionIndexRange subpartitionRange =
+                            computeConsumedSubpartitionRange(
+                                    i,
+                                    numConsumers,
+                                    numOfSubpartitionsRetriever.apply(partitionNum),
+                                    isDynamicGraph,
+                                    false);
+                    taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange));
+                }
+            }
+        }
+        return new VertexInputInfo(taskInputInfos);
+    }
+
+    /**
+     * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#ALL_TO_ALL} edge. This
+     * computation algorithm will evenly distribute subpartitions to downstream subtasks according
+     * to the number of subpartitions. Different downstream subtasks consume roughly the same number
+     * of subpartitions.
+     *
+     * @param sourceCount the parallelism of upstream
+     * @param targetCount the parallelism of downstream
+     * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @param isBroadcast whether the edge is broadcast
+     * @return the computed {@link VertexInputInfo}
+     */
+    @VisibleForTesting
+    static VertexInputInfo computeVertexInputInfoForAllToAll(
+            int sourceCount,
+            int targetCount,
+            Function<Integer, Integer> numOfSubpartitionsRetriever,
+            boolean isDynamicGraph,
+            boolean isBroadcast) {
+        final List<TaskInputInfo> taskInputInfos = new ArrayList<>();
+        for (int i = 0; i < targetCount; ++i) {
+            PartitionIndexRange partitionRange = new PartitionIndexRange(0, sourceCount - 1);
+            SubpartitionIndexRange subpartitionRange =
+                    computeConsumedSubpartitionRange(
+                            i,
+                            targetCount,
+                            numOfSubpartitionsRetriever.apply(0),
+                            isDynamicGraph,
+                            isBroadcast);
+            taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange));
+        }
+        return new VertexInputInfo(taskInputInfos);
+    }
+
+    /**
+     * Compute the consumed subpartition range for a subtask. This computation algorithm will evenly
+     * distribute subpartitions to downstream subtasks according to the number of subpartitions.
+     * Different downstream subtasks consume roughly the same number of subpartitions.
+     *
+     * @param consumerSubtaskIndex the subtask index
+     * @param numConsumers the total number of consumers
+     * @param numSubpartitions the total number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @param isBroadcast whether the edge is broadcast
+     * @return the computed subpartition range
+     */
+    @VisibleForTesting
+    static SubpartitionIndexRange computeConsumedSubpartitionRange(
+            int consumerSubtaskIndex,
+            int numConsumers,
+            int numSubpartitions,
+            boolean isDynamicGraph,
+            boolean isBroadcast) {
+        int consumerIndex = consumerSubtaskIndex % numConsumers;
+        if (!isDynamicGraph) {
+            return new SubpartitionIndexRange(consumerIndex, consumerIndex);
+        } else {
+            if (isBroadcast) {
+                // broadcast results have only one subpartition, and be consumed multiple times.
+                checkArgument(numSubpartitions == 1);
+                return new SubpartitionIndexRange(0, 0);
+            } else {
+                checkArgument(consumerIndex < numConsumers);
+                checkArgument(numConsumers <= numSubpartitions);
+
+                int start = consumerIndex * numSubpartitions / numConsumers;
+                int nextStart = (consumerIndex + 1) * numSubpartitions / numConsumers;
+
+                return new SubpartitionIndexRange(start, nextStart - 1);
+            }
+        }
+    }
+
+    private static class IntermediateResultWrapper implements IntermediateResultInfo {
+        private final IntermediateResult intermediateResult;
+
+        IntermediateResultWrapper(IntermediateResult intermediateResult) {
+            this.intermediateResult = checkNotNull(intermediateResult);
+        }
+
+        @Override
+        public IntermediateDataSetID getResultId() {
+            return intermediateResult.getId();
+        }
+
+        @Override
+        public boolean isBroadcast() {
+            return intermediateResult.isBroadcast();
+        }
+
+        @Override
+        public boolean isPointwise() {
+            return intermediateResult.getConsumingDistributionPattern()
+                    == DistributionPattern.POINTWISE;
+        }
+
+        @Override
+        public int getNumPartitions() {
+            return intermediateResult.getNumberOfAssignedPartitions();
+        }
+
+        @Override
+        public int getNumSubpartitions(int partitionIndex) {
+            boolean isDynamicGraph = intermediateResult.getProducer().getGraph().isDynamic();
+            // Note that for non-dynamic graph, the num of subpartition has not been decided at

Review Comment:
   non-dynamic -> dynamic
   
   When connecting to a result, the result should have been fully produced. Why is the `numberOfSubpartitions` not decided yet? 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInputInfo.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class describe the inputs(partitions and subpartitions that belong to the same intermediate
+ * result) information of a subtask.
+ */
+public class TaskInputInfo {
+    private final int consumerIndex;
+
+    private final PartitionIndexRange partitionIndexRange;
+
+    private final SubpartitionIndexRange subpartitionIndexRange;
+
+    public TaskInputInfo(
+            final int consumerIndex,
+            final PartitionIndexRange partitionIndexRange,
+            final SubpartitionIndexRange subpartitionIndexRange) {
+        this.consumerIndex = consumerIndex;
+        this.partitionIndexRange = checkNotNull(partitionIndexRange);
+        this.subpartitionIndexRange = checkNotNull(subpartitionIndexRange);
+    }
+
+    /** Get the subpartition range this subtask should consume. */
+    public SubpartitionIndexRange getSubpartitionIndexRange() {
+        return checkNotNull(subpartitionIndexRange);

Review Comment:
   No need to do `checkNotNull` because it's already checked in the constructor and the field is immutable.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInputInfo.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class describe the inputs(partitions and subpartitions that belong to the same intermediate
+ * result) information of a subtask.
+ */
+public class TaskInputInfo {
+    private final int consumerIndex;

Review Comment:
   -> subtaskIndex 
   
   I think it's easier to understand.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoComputationUtils.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Util to compute {@link VertexInputInfo}s for execution job vertex. */
+public class VertexInputInfoComputationUtils {
+
+    public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos(
+            ExecutionJobVertex ejv,
+            Function<IntermediateDataSetID, IntermediateResult> intermediateResultRetriever) {
+        checkState(ejv.isParallelismDecided());
+        final List<IntermediateResultInfo> intermediateResultInfos =
+                ejv.getJobVertex().getInputs().stream()
+                        .map(JobEdge::getSourceId)
+                        .map(intermediateResultRetriever)
+                        .map(IntermediateResultWrapper::new)
+                        .collect(Collectors.toList());
+        return computeVertexInputInfos(
+                ejv.getParallelism(), intermediateResultInfos, ejv.getGraph().isDynamic());
+    }
+
+    public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos(
+            int parallelism, List<IntermediateResultInfo> inputs, boolean isDynamicGraph) {
+
+        checkArgument(parallelism > 0);
+        final Map<IntermediateDataSetID, VertexInputInfo> vertexInputInfos = new LinkedHashMap<>();
+
+        for (IntermediateResultInfo input : inputs) {
+            int sourceParallelism = input.getNumPartitions();
+
+            if (input.isPointwise()) {
+                vertexInputInfos.putIfAbsent(
+                        input.getResultId(),
+                        computeVertexInputInfoForPointwise(
+                                sourceParallelism,
+                                parallelism,
+                                input::getNumSubpartitions,
+                                isDynamicGraph));
+            } else {
+                vertexInputInfos.putIfAbsent(
+                        input.getResultId(),
+                        computeVertexInputInfoForAllToAll(
+                                sourceParallelism,
+                                parallelism,
+                                input::getNumSubpartitions,
+                                isDynamicGraph,
+                                input.isBroadcast()));
+            }
+        }
+
+        return vertexInputInfos;
+    }
+
+    /**
+     * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#POINTWISE} edge. This
+     * computation algorithm will evenly distribute subpartitions to downstream subtasks according
+     * to the number of subpartitions. Different downstream subtasks consume roughly the same number
+     * of subpartitions.
+     *
+     * @param sourceCount the parallelism of upstream
+     * @param targetCount the parallelism of downstream
+     * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @return the computed {@link VertexInputInfo}
+     */
+    @VisibleForTesting
+    static VertexInputInfo computeVertexInputInfoForPointwise(
+            int sourceCount,
+            int targetCount,
+            Function<Integer, Integer> numOfSubpartitionsRetriever,
+            boolean isDynamicGraph) {
+
+        final List<TaskInputInfo> taskInputInfos = new ArrayList<>();
+
+        if (sourceCount >= targetCount) {
+            for (int index = 0; index < targetCount; index++) {
+
+                int start = index * sourceCount / targetCount;
+                int end = (index + 1) * sourceCount / targetCount;
+
+                PartitionIndexRange partitionRange = new PartitionIndexRange(start, end - 1);
+                SubpartitionIndexRange subpartitionRange =
+                        computeConsumedSubpartitionRange(
+                                index,
+                                1,
+                                numOfSubpartitionsRetriever.apply(start),
+                                isDynamicGraph,
+                                false);
+                taskInputInfos.add(new TaskInputInfo(index, partitionRange, subpartitionRange));
+            }
+        } else {
+            for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) {
+
+                int start = (partitionNum * targetCount + sourceCount - 1) / sourceCount;
+                int end = ((partitionNum + 1) * targetCount + sourceCount - 1) / sourceCount;
+                int numConsumers = end - start;
+
+                for (int i = start; i < end; i++) {
+                    PartitionIndexRange partitionRange =
+                            new PartitionIndexRange(partitionNum, partitionNum);
+                    SubpartitionIndexRange subpartitionRange =
+                            computeConsumedSubpartitionRange(
+                                    i,
+                                    numConsumers,
+                                    numOfSubpartitionsRetriever.apply(partitionNum),
+                                    isDynamicGraph,
+                                    false);
+                    taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange));
+                }
+            }
+        }
+        return new VertexInputInfo(taskInputInfos);
+    }
+
+    /**
+     * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#ALL_TO_ALL} edge. This
+     * computation algorithm will evenly distribute subpartitions to downstream subtasks according
+     * to the number of subpartitions. Different downstream subtasks consume roughly the same number
+     * of subpartitions.
+     *
+     * @param sourceCount the parallelism of upstream
+     * @param targetCount the parallelism of downstream
+     * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @param isBroadcast whether the edge is broadcast
+     * @return the computed {@link VertexInputInfo}
+     */
+    @VisibleForTesting
+    static VertexInputInfo computeVertexInputInfoForAllToAll(
+            int sourceCount,
+            int targetCount,
+            Function<Integer, Integer> numOfSubpartitionsRetriever,
+            boolean isDynamicGraph,
+            boolean isBroadcast) {
+        final List<TaskInputInfo> taskInputInfos = new ArrayList<>();
+        for (int i = 0; i < targetCount; ++i) {
+            PartitionIndexRange partitionRange = new PartitionIndexRange(0, sourceCount - 1);
+            SubpartitionIndexRange subpartitionRange =
+                    computeConsumedSubpartitionRange(
+                            i,
+                            targetCount,
+                            numOfSubpartitionsRetriever.apply(0),
+                            isDynamicGraph,
+                            isBroadcast);
+            taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange));
+        }
+        return new VertexInputInfo(taskInputInfos);
+    }
+
+    /**
+     * Compute the consumed subpartition range for a subtask. This computation algorithm will evenly
+     * distribute subpartitions to downstream subtasks according to the number of subpartitions.
+     * Different downstream subtasks consume roughly the same number of subpartitions.
+     *
+     * @param consumerSubtaskIndex the subtask index
+     * @param numConsumers the total number of consumers
+     * @param numSubpartitions the total number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @param isBroadcast whether the edge is broadcast
+     * @return the computed subpartition range
+     */
+    @VisibleForTesting
+    static SubpartitionIndexRange computeConsumedSubpartitionRange(
+            int consumerSubtaskIndex,
+            int numConsumers,
+            int numSubpartitions,
+            boolean isDynamicGraph,
+            boolean isBroadcast) {
+        int consumerIndex = consumerSubtaskIndex % numConsumers;
+        if (!isDynamicGraph) {
+            return new SubpartitionIndexRange(consumerIndex, consumerIndex);
+        } else {
+            if (isBroadcast) {
+                // broadcast results have only one subpartition, and be consumed multiple times.
+                checkArgument(numSubpartitions == 1);
+                return new SubpartitionIndexRange(0, 0);
+            } else {
+                checkArgument(consumerIndex < numConsumers);
+                checkArgument(numConsumers <= numSubpartitions);
+
+                int start = consumerIndex * numSubpartitions / numConsumers;
+                int nextStart = (consumerIndex + 1) * numSubpartitions / numConsumers;
+
+                return new SubpartitionIndexRange(start, nextStart - 1);
+            }
+        }
+    }
+
+    private static class IntermediateResultWrapper implements IntermediateResultInfo {
+        private final IntermediateResult intermediateResult;
+
+        IntermediateResultWrapper(IntermediateResult intermediateResult) {
+            this.intermediateResult = checkNotNull(intermediateResult);
+        }
+
+        @Override
+        public IntermediateDataSetID getResultId() {
+            return intermediateResult.getId();
+        }
+
+        @Override
+        public boolean isBroadcast() {
+            return intermediateResult.isBroadcast();
+        }
+
+        @Override
+        public boolean isPointwise() {
+            return intermediateResult.getConsumingDistributionPattern()
+                    == DistributionPattern.POINTWISE;
+        }
+
+        @Override
+        public int getNumPartitions() {
+            return intermediateResult.getNumberOfAssignedPartitions();
+        }
+
+        @Override
+        public int getNumSubpartitions(int partitionIndex) {
+            boolean isDynamicGraph = intermediateResult.getProducer().getGraph().isDynamic();
+            // Note that for non-dynamic graph, the num of subpartition has not been decided at
+            // this time
+            return isDynamicGraph
+                    ? intermediateResult.getPartitions()[partitionIndex].getNumberOfSubpartitions()
+                    : IntermediateResultPartition.UNKNOWN;

Review Comment:
   IntermediateResultPartition.UNKNOWN is confusing. It's better to rename it as NUM_SUBPARTITIONS_UNKNOWN.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoComputationUtils.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Util to compute {@link VertexInputInfo}s for execution job vertex. */
+public class VertexInputInfoComputationUtils {
+
+    public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos(
+            ExecutionJobVertex ejv,
+            Function<IntermediateDataSetID, IntermediateResult> intermediateResultRetriever) {
+        checkState(ejv.isParallelismDecided());
+        final List<IntermediateResultInfo> intermediateResultInfos =
+                ejv.getJobVertex().getInputs().stream()
+                        .map(JobEdge::getSourceId)
+                        .map(intermediateResultRetriever)
+                        .map(IntermediateResultWrapper::new)
+                        .collect(Collectors.toList());
+        return computeVertexInputInfos(
+                ejv.getParallelism(), intermediateResultInfos, ejv.getGraph().isDynamic());
+    }
+
+    public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos(
+            int parallelism, List<IntermediateResultInfo> inputs, boolean isDynamicGraph) {
+
+        checkArgument(parallelism > 0);
+        final Map<IntermediateDataSetID, VertexInputInfo> vertexInputInfos = new LinkedHashMap<>();
+
+        for (IntermediateResultInfo input : inputs) {
+            int sourceParallelism = input.getNumPartitions();
+
+            if (input.isPointwise()) {
+                vertexInputInfos.putIfAbsent(
+                        input.getResultId(),
+                        computeVertexInputInfoForPointwise(
+                                sourceParallelism,
+                                parallelism,
+                                input::getNumSubpartitions,
+                                isDynamicGraph));
+            } else {
+                vertexInputInfos.putIfAbsent(
+                        input.getResultId(),
+                        computeVertexInputInfoForAllToAll(
+                                sourceParallelism,
+                                parallelism,
+                                input::getNumSubpartitions,
+                                isDynamicGraph,
+                                input.isBroadcast()));
+            }
+        }
+
+        return vertexInputInfos;
+    }
+
+    /**
+     * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#POINTWISE} edge. This
+     * computation algorithm will evenly distribute subpartitions to downstream subtasks according
+     * to the number of subpartitions. Different downstream subtasks consume roughly the same number
+     * of subpartitions.
+     *
+     * @param sourceCount the parallelism of upstream
+     * @param targetCount the parallelism of downstream
+     * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @return the computed {@link VertexInputInfo}
+     */
+    @VisibleForTesting
+    static VertexInputInfo computeVertexInputInfoForPointwise(
+            int sourceCount,
+            int targetCount,
+            Function<Integer, Integer> numOfSubpartitionsRetriever,
+            boolean isDynamicGraph) {
+
+        final List<TaskInputInfo> taskInputInfos = new ArrayList<>();
+
+        if (sourceCount >= targetCount) {
+            for (int index = 0; index < targetCount; index++) {
+
+                int start = index * sourceCount / targetCount;
+                int end = (index + 1) * sourceCount / targetCount;
+
+                PartitionIndexRange partitionRange = new PartitionIndexRange(start, end - 1);
+                SubpartitionIndexRange subpartitionRange =
+                        computeConsumedSubpartitionRange(
+                                index,
+                                1,
+                                numOfSubpartitionsRetriever.apply(start),
+                                isDynamicGraph,
+                                false);
+                taskInputInfos.add(new TaskInputInfo(index, partitionRange, subpartitionRange));
+            }
+        } else {
+            for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) {
+
+                int start = (partitionNum * targetCount + sourceCount - 1) / sourceCount;
+                int end = ((partitionNum + 1) * targetCount + sourceCount - 1) / sourceCount;
+                int numConsumers = end - start;
+
+                for (int i = start; i < end; i++) {
+                    PartitionIndexRange partitionRange =
+                            new PartitionIndexRange(partitionNum, partitionNum);
+                    SubpartitionIndexRange subpartitionRange =
+                            computeConsumedSubpartitionRange(
+                                    i,
+                                    numConsumers,
+                                    numOfSubpartitionsRetriever.apply(partitionNum),
+                                    isDynamicGraph,
+                                    false);
+                    taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange));
+                }
+            }
+        }
+        return new VertexInputInfo(taskInputInfos);
+    }
+
+    /**
+     * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#ALL_TO_ALL} edge. This
+     * computation algorithm will evenly distribute subpartitions to downstream subtasks according
+     * to the number of subpartitions. Different downstream subtasks consume roughly the same number
+     * of subpartitions.
+     *
+     * @param sourceCount the parallelism of upstream
+     * @param targetCount the parallelism of downstream
+     * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @param isBroadcast whether the edge is broadcast
+     * @return the computed {@link VertexInputInfo}
+     */
+    @VisibleForTesting
+    static VertexInputInfo computeVertexInputInfoForAllToAll(
+            int sourceCount,
+            int targetCount,
+            Function<Integer, Integer> numOfSubpartitionsRetriever,
+            boolean isDynamicGraph,
+            boolean isBroadcast) {
+        final List<TaskInputInfo> taskInputInfos = new ArrayList<>();
+        for (int i = 0; i < targetCount; ++i) {
+            PartitionIndexRange partitionRange = new PartitionIndexRange(0, sourceCount - 1);

Review Comment:
   The `partitionRange` can be created only once and reused in the loop.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInputInfo.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class describe the inputs(partitions and subpartitions that belong to the same intermediate
+ * result) information of a subtask.
+ */
+public class TaskInputInfo {
+    private final int consumerIndex;
+
+    private final PartitionIndexRange partitionIndexRange;
+
+    private final SubpartitionIndexRange subpartitionIndexRange;
+
+    public TaskInputInfo(
+            final int consumerIndex,
+            final PartitionIndexRange partitionIndexRange,
+            final SubpartitionIndexRange subpartitionIndexRange) {
+        this.consumerIndex = consumerIndex;
+        this.partitionIndexRange = checkNotNull(partitionIndexRange);
+        this.subpartitionIndexRange = checkNotNull(subpartitionIndexRange);
+    }
+
+    /** Get the subpartition range this subtask should consume. */
+    public SubpartitionIndexRange getSubpartitionIndexRange() {
+        return checkNotNull(subpartitionIndexRange);
+    }
+
+    /** Get the partition range this subtask should consume. */
+    public PartitionIndexRange getPartitionIndexRange() {
+        return checkNotNull(partitionIndexRange);

Review Comment:
   No need to do checkNotNull because it's already checked in the constructor and the field is immutable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on a diff in pull request #21162: [FLINK-29665][runtime] Support flexible subpartion range division

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #21162:
URL: https://github.com/apache/flink/pull/21162#discussion_r1058767679


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoComputationUtils.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Util to compute {@link VertexInputInfo}s for execution job vertex. */
+public class VertexInputInfoComputationUtils {
+
+    public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos(
+            ExecutionJobVertex ejv,
+            Function<IntermediateDataSetID, IntermediateResult> intermediateResultRetriever) {
+        checkState(ejv.isParallelismDecided());
+        final List<IntermediateResultInfo> intermediateResultInfos =
+                ejv.getJobVertex().getInputs().stream()
+                        .map(JobEdge::getSourceId)
+                        .map(intermediateResultRetriever)
+                        .map(IntermediateResultWrapper::new)
+                        .collect(Collectors.toList());
+        return computeVertexInputInfos(
+                ejv.getParallelism(), intermediateResultInfos, ejv.getGraph().isDynamic());
+    }
+
+    public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos(
+            int parallelism, List<IntermediateResultInfo> inputs, boolean isDynamicGraph) {
+
+        checkArgument(parallelism > 0);
+        final Map<IntermediateDataSetID, VertexInputInfo> vertexInputInfos = new LinkedHashMap<>();
+
+        for (IntermediateResultInfo input : inputs) {
+            int sourceParallelism = input.getNumPartitions();
+
+            if (input.isPointwise()) {
+                vertexInputInfos.putIfAbsent(
+                        input.getResultId(),
+                        computeVertexInputInfoForPointwise(
+                                sourceParallelism,
+                                parallelism,
+                                input::getNumSubpartitions,
+                                isDynamicGraph));
+            } else {
+                vertexInputInfos.putIfAbsent(
+                        input.getResultId(),
+                        computeVertexInputInfoForAllToAll(
+                                sourceParallelism,
+                                parallelism,
+                                input::getNumSubpartitions,
+                                isDynamicGraph,
+                                input.isBroadcast()));
+            }
+        }
+
+        return vertexInputInfos;
+    }
+
+    /**
+     * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#POINTWISE} edge. This
+     * computation algorithm will evenly distribute subpartitions to downstream subtasks according
+     * to the number of subpartitions. Different downstream subtasks consume roughly the same number
+     * of subpartitions.
+     *
+     * @param sourceCount the parallelism of upstream
+     * @param targetCount the parallelism of downstream
+     * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @return the computed {@link VertexInputInfo}
+     */
+    @VisibleForTesting
+    static VertexInputInfo computeVertexInputInfoForPointwise(
+            int sourceCount,
+            int targetCount,
+            Function<Integer, Integer> numOfSubpartitionsRetriever,
+            boolean isDynamicGraph) {
+
+        final List<TaskInputInfo> taskInputInfos = new ArrayList<>();
+
+        if (sourceCount >= targetCount) {
+            for (int index = 0; index < targetCount; index++) {
+
+                int start = index * sourceCount / targetCount;
+                int end = (index + 1) * sourceCount / targetCount;
+
+                PartitionIndexRange partitionRange = new PartitionIndexRange(start, end - 1);
+                SubpartitionIndexRange subpartitionRange =
+                        computeConsumedSubpartitionRange(
+                                index,
+                                1,
+                                numOfSubpartitionsRetriever.apply(start),
+                                isDynamicGraph,
+                                false);
+                taskInputInfos.add(new TaskInputInfo(index, partitionRange, subpartitionRange));
+            }
+        } else {
+            for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) {
+
+                int start = (partitionNum * targetCount + sourceCount - 1) / sourceCount;
+                int end = ((partitionNum + 1) * targetCount + sourceCount - 1) / sourceCount;
+                int numConsumers = end - start;
+
+                for (int i = start; i < end; i++) {
+                    PartitionIndexRange partitionRange =
+                            new PartitionIndexRange(partitionNum, partitionNum);
+                    SubpartitionIndexRange subpartitionRange =
+                            computeConsumedSubpartitionRange(
+                                    i,
+                                    numConsumers,
+                                    numOfSubpartitionsRetriever.apply(partitionNum),
+                                    isDynamicGraph,
+                                    false);
+                    taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange));
+                }
+            }
+        }
+        return new VertexInputInfo(taskInputInfos);
+    }
+
+    /**
+     * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#ALL_TO_ALL} edge. This
+     * computation algorithm will evenly distribute subpartitions to downstream subtasks according
+     * to the number of subpartitions. Different downstream subtasks consume roughly the same number
+     * of subpartitions.
+     *
+     * @param sourceCount the parallelism of upstream
+     * @param targetCount the parallelism of downstream
+     * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @param isBroadcast whether the edge is broadcast
+     * @return the computed {@link VertexInputInfo}
+     */
+    @VisibleForTesting
+    static VertexInputInfo computeVertexInputInfoForAllToAll(
+            int sourceCount,
+            int targetCount,
+            Function<Integer, Integer> numOfSubpartitionsRetriever,
+            boolean isDynamicGraph,
+            boolean isBroadcast) {
+        final List<TaskInputInfo> taskInputInfos = new ArrayList<>();
+        for (int i = 0; i < targetCount; ++i) {
+            PartitionIndexRange partitionRange = new PartitionIndexRange(0, sourceCount - 1);
+            SubpartitionIndexRange subpartitionRange =
+                    computeConsumedSubpartitionRange(
+                            i,
+                            targetCount,
+                            numOfSubpartitionsRetriever.apply(0),
+                            isDynamicGraph,
+                            isBroadcast);
+            taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange));
+        }
+        return new VertexInputInfo(taskInputInfos);
+    }
+
+    /**
+     * Compute the consumed subpartition range for a subtask. This computation algorithm will evenly
+     * distribute subpartitions to downstream subtasks according to the number of subpartitions.
+     * Different downstream subtasks consume roughly the same number of subpartitions.
+     *
+     * @param consumerSubtaskIndex the subtask index
+     * @param numConsumers the total number of consumers
+     * @param numSubpartitions the total number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @param isBroadcast whether the edge is broadcast
+     * @return the computed subpartition range
+     */
+    @VisibleForTesting
+    static SubpartitionIndexRange computeConsumedSubpartitionRange(
+            int consumerSubtaskIndex,
+            int numConsumers,
+            int numSubpartitions,
+            boolean isDynamicGraph,
+            boolean isBroadcast) {
+        int consumerIndex = consumerSubtaskIndex % numConsumers;
+        if (!isDynamicGraph) {
+            return new SubpartitionIndexRange(consumerIndex, consumerIndex);
+        } else {
+            if (isBroadcast) {
+                // broadcast results have only one subpartition, and be consumed multiple times.
+                checkArgument(numSubpartitions == 1);
+                return new SubpartitionIndexRange(0, 0);
+            } else {
+                checkArgument(consumerIndex < numConsumers);
+                checkArgument(numConsumers <= numSubpartitions);
+
+                int start = consumerIndex * numSubpartitions / numConsumers;
+                int nextStart = (consumerIndex + 1) * numSubpartitions / numConsumers;
+
+                return new SubpartitionIndexRange(start, nextStart - 1);
+            }
+        }
+    }
+
+    private static class IntermediateResultWrapper implements IntermediateResultInfo {
+        private final IntermediateResult intermediateResult;
+
+        IntermediateResultWrapper(IntermediateResult intermediateResult) {
+            this.intermediateResult = checkNotNull(intermediateResult);
+        }
+
+        @Override
+        public IntermediateDataSetID getResultId() {
+            return intermediateResult.getId();
+        }
+
+        @Override
+        public boolean isBroadcast() {
+            return intermediateResult.isBroadcast();
+        }
+
+        @Override
+        public boolean isPointwise() {
+            return intermediateResult.getConsumingDistributionPattern()
+                    == DistributionPattern.POINTWISE;
+        }
+
+        @Override
+        public int getNumPartitions() {
+            return intermediateResult.getNumberOfAssignedPartitions();
+        }
+
+        @Override
+        public int getNumSubpartitions(int partitionIndex) {
+            boolean isDynamicGraph = intermediateResult.getProducer().getGraph().isDynamic();
+            // Note that for non-dynamic graph, the num of subpartition has not been decided at

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #21162: [FLINK-29665][runtime] Support flexible subpartion range division

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #21162:
URL: https://github.com/apache/flink/pull/21162#discussion_r1058681107


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoComputationUtils.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Util to compute {@link VertexInputInfo}s for execution job vertex. */
+public class VertexInputInfoComputationUtils {
+
+    public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos(
+            ExecutionJobVertex ejv,
+            Function<IntermediateDataSetID, IntermediateResult> intermediateResultRetriever) {
+        checkState(ejv.isParallelismDecided());
+        final List<IntermediateResultInfo> intermediateResultInfos =
+                ejv.getJobVertex().getInputs().stream()
+                        .map(JobEdge::getSourceId)
+                        .map(intermediateResultRetriever)
+                        .map(IntermediateResultWrapper::new)
+                        .collect(Collectors.toList());
+        return computeVertexInputInfos(
+                ejv.getParallelism(), intermediateResultInfos, ejv.getGraph().isDynamic());
+    }
+
+    public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos(
+            int parallelism, List<IntermediateResultInfo> inputs, boolean isDynamicGraph) {
+
+        checkArgument(parallelism > 0);
+        final Map<IntermediateDataSetID, VertexInputInfo> vertexInputInfos = new LinkedHashMap<>();
+
+        for (IntermediateResultInfo input : inputs) {
+            int sourceParallelism = input.getNumPartitions();
+
+            if (input.isPointwise()) {
+                vertexInputInfos.putIfAbsent(
+                        input.getResultId(),
+                        computeVertexInputInfoForPointwise(
+                                sourceParallelism,
+                                parallelism,
+                                input::getNumSubpartitions,
+                                isDynamicGraph));
+            } else {
+                vertexInputInfos.putIfAbsent(
+                        input.getResultId(),
+                        computeVertexInputInfoForAllToAll(
+                                sourceParallelism,
+                                parallelism,
+                                input::getNumSubpartitions,
+                                isDynamicGraph,
+                                input.isBroadcast()));
+            }
+        }
+
+        return vertexInputInfos;
+    }
+
+    /**
+     * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#POINTWISE} edge. This
+     * computation algorithm will evenly distribute subpartitions to downstream subtasks according
+     * to the number of subpartitions. Different downstream subtasks consume roughly the same number
+     * of subpartitions.
+     *
+     * @param sourceCount the parallelism of upstream
+     * @param targetCount the parallelism of downstream
+     * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @return the computed {@link VertexInputInfo}
+     */
+    @VisibleForTesting
+    static VertexInputInfo computeVertexInputInfoForPointwise(
+            int sourceCount,
+            int targetCount,
+            Function<Integer, Integer> numOfSubpartitionsRetriever,
+            boolean isDynamicGraph) {
+
+        final List<TaskInputInfo> taskInputInfos = new ArrayList<>();
+
+        if (sourceCount >= targetCount) {
+            for (int index = 0; index < targetCount; index++) {
+
+                int start = index * sourceCount / targetCount;
+                int end = (index + 1) * sourceCount / targetCount;
+
+                PartitionIndexRange partitionRange = new PartitionIndexRange(start, end - 1);
+                SubpartitionIndexRange subpartitionRange =
+                        computeConsumedSubpartitionRange(
+                                index,
+                                1,
+                                numOfSubpartitionsRetriever.apply(start),
+                                isDynamicGraph,
+                                false);
+                taskInputInfos.add(new TaskInputInfo(index, partitionRange, subpartitionRange));
+            }
+        } else {
+            for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) {
+
+                int start = (partitionNum * targetCount + sourceCount - 1) / sourceCount;
+                int end = ((partitionNum + 1) * targetCount + sourceCount - 1) / sourceCount;
+                int numConsumers = end - start;
+
+                for (int i = start; i < end; i++) {
+                    PartitionIndexRange partitionRange =
+                            new PartitionIndexRange(partitionNum, partitionNum);
+                    SubpartitionIndexRange subpartitionRange =
+                            computeConsumedSubpartitionRange(
+                                    i,
+                                    numConsumers,
+                                    numOfSubpartitionsRetriever.apply(partitionNum),
+                                    isDynamicGraph,
+                                    false);
+                    taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange));
+                }
+            }
+        }
+        return new VertexInputInfo(taskInputInfos);
+    }
+
+    /**
+     * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#ALL_TO_ALL} edge. This
+     * computation algorithm will evenly distribute subpartitions to downstream subtasks according
+     * to the number of subpartitions. Different downstream subtasks consume roughly the same number
+     * of subpartitions.
+     *
+     * @param sourceCount the parallelism of upstream
+     * @param targetCount the parallelism of downstream
+     * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @param isBroadcast whether the edge is broadcast
+     * @return the computed {@link VertexInputInfo}
+     */
+    @VisibleForTesting
+    static VertexInputInfo computeVertexInputInfoForAllToAll(
+            int sourceCount,
+            int targetCount,
+            Function<Integer, Integer> numOfSubpartitionsRetriever,
+            boolean isDynamicGraph,
+            boolean isBroadcast) {
+        final List<TaskInputInfo> taskInputInfos = new ArrayList<>();
+        for (int i = 0; i < targetCount; ++i) {
+            PartitionIndexRange partitionRange = new PartitionIndexRange(0, sourceCount - 1);
+            SubpartitionIndexRange subpartitionRange =
+                    computeConsumedSubpartitionRange(
+                            i,
+                            targetCount,
+                            numOfSubpartitionsRetriever.apply(0),
+                            isDynamicGraph,
+                            isBroadcast);
+            taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange));
+        }
+        return new VertexInputInfo(taskInputInfos);
+    }
+
+    /**
+     * Compute the consumed subpartition range for a subtask. This computation algorithm will evenly
+     * distribute subpartitions to downstream subtasks according to the number of subpartitions.
+     * Different downstream subtasks consume roughly the same number of subpartitions.
+     *
+     * @param consumerSubtaskIndex the subtask index
+     * @param numConsumers the total number of consumers
+     * @param numSubpartitions the total number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @param isBroadcast whether the edge is broadcast
+     * @return the computed subpartition range
+     */
+    @VisibleForTesting
+    static SubpartitionIndexRange computeConsumedSubpartitionRange(
+            int consumerSubtaskIndex,
+            int numConsumers,
+            int numSubpartitions,
+            boolean isDynamicGraph,
+            boolean isBroadcast) {
+        int consumerIndex = consumerSubtaskIndex % numConsumers;
+        if (!isDynamicGraph) {
+            return new SubpartitionIndexRange(consumerIndex, consumerIndex);
+        } else {
+            if (isBroadcast) {
+                // broadcast results have only one subpartition, and be consumed multiple times.
+                checkArgument(numSubpartitions == 1);
+                return new SubpartitionIndexRange(0, 0);
+            } else {
+                checkArgument(consumerIndex < numConsumers);
+                checkArgument(numConsumers <= numSubpartitions);
+
+                int start = consumerIndex * numSubpartitions / numConsumers;
+                int nextStart = (consumerIndex + 1) * numSubpartitions / numConsumers;
+
+                return new SubpartitionIndexRange(start, nextStart - 1);
+            }
+        }
+    }
+
+    private static class IntermediateResultWrapper implements IntermediateResultInfo {
+        private final IntermediateResult intermediateResult;
+
+        IntermediateResultWrapper(IntermediateResult intermediateResult) {
+            this.intermediateResult = checkNotNull(intermediateResult);
+        }
+
+        @Override
+        public IntermediateDataSetID getResultId() {
+            return intermediateResult.getId();
+        }
+
+        @Override
+        public boolean isBroadcast() {
+            return intermediateResult.isBroadcast();
+        }
+
+        @Override
+        public boolean isPointwise() {
+            return intermediateResult.getConsumingDistributionPattern()
+                    == DistributionPattern.POINTWISE;
+        }
+
+        @Override
+        public int getNumPartitions() {
+            return intermediateResult.getNumberOfAssignedPartitions();
+        }
+
+        @Override
+        public int getNumSubpartitions(int partitionIndex) {
+            boolean isDynamicGraph = intermediateResult.getProducer().getGraph().isDynamic();
+            // Note that for non-dynamic graph, the num of subpartition has not been decided at

Review Comment:
   If we never expect a non-dynamic graph to invoke this method, it's better to add a check here, also some comments to explain it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #21162: [FLINK-29665][runtime] Support flexible subpartion range division

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #21162:
URL: https://github.com/apache/flink/pull/21162#discussion_r1058779302


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoComputationUtils.java:
##########
@@ -255,12 +259,11 @@ public int getNumPartitions() {
 
         @Override
         public int getNumSubpartitions(int partitionIndex) {
-            boolean isDynamicGraph = intermediateResult.getProducer().getGraph().isDynamic();
-            // Note that for non-dynamic graph, the num of subpartition has not been decided at
-            // this time
-            return isDynamicGraph
-                    ? intermediateResult.getPartitions()[partitionIndex].getNumberOfSubpartitions()
-                    : IntermediateResultPartition.NUM_SUBPARTITIONS_UNKNOWN;
+            // Note that this method should only be called for dynamic graph

Review Comment:
   Better to explain why, e.g. this method is used to compute which sub-partitions a consumer vertex should consume, however, for non-dynamic graph it is not needed, and the number of sub-partitions is not decided at this stage, due to the execution edge are not created.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21162: [FLINK-29665][runtime] Support flexible subpartion range division

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21162:
URL: https://github.com/apache/flink/pull/21162#issuecomment-1291747391

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a90c3f20d260ec8f0b9b3ecee2aedec908ff04d1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a90c3f20d260ec8f0b9b3ecee2aedec908ff04d1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a90c3f20d260ec8f0b9b3ecee2aedec908ff04d1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on pull request #21162: [FLINK-29665][runtime] Support flexible subpartion range division

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on PR #21162:
URL: https://github.com/apache/flink/pull/21162#issuecomment-1365198695

   Thanks for review @zhuzhurk. I 've addressed all comments, please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 closed pull request #21162: [FLINK-29665][runtime] Support flexible subpartion range division

Posted by GitBox <gi...@apache.org>.
wanglijie95 closed pull request #21162: [FLINK-29665][runtime] Support flexible subpartion range division
URL: https://github.com/apache/flink/pull/21162


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on a diff in pull request #21162: [FLINK-29665][runtime] Support flexible subpartion range division

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #21162:
URL: https://github.com/apache/flink/pull/21162#discussion_r1058781862


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoComputationUtils.java:
##########
@@ -255,12 +259,11 @@ public int getNumPartitions() {
 
         @Override
         public int getNumSubpartitions(int partitionIndex) {
-            boolean isDynamicGraph = intermediateResult.getProducer().getGraph().isDynamic();
-            // Note that for non-dynamic graph, the num of subpartition has not been decided at
-            // this time
-            return isDynamicGraph
-                    ? intermediateResult.getPartitions()[partitionIndex].getNumberOfSubpartitions()
-                    : IntermediateResultPartition.NUM_SUBPARTITIONS_UNKNOWN;
+            // Note that this method should only be called for dynamic graph

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on a diff in pull request #21162: [FLINK-29665][runtime] Support flexible subpartion range division

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #21162:
URL: https://github.com/apache/flink/pull/21162#discussion_r1057242727


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoStore.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A store contains all the {@link VertexInputInfo}s. */
+public class VertexInputInfoStore {
+
+    private final Map<JobVertexID, Map<IntermediateDataSetID, VertexInputInfo>> vertexInputInfos =
+            new HashMap<>();
+
+    /**
+     * Put a {@link VertexInputInfo}.
+     *
+     * @param jobVertexId the job vertex id
+     * @param resultId the intermediate result id
+     * @param info the {@link VertexInputInfo} to put
+     */
+    public void put(JobVertexID jobVertexId, IntermediateDataSetID resultId, VertexInputInfo info) {
+        checkNotNull(jobVertexId);
+        checkNotNull(resultId);
+        checkNotNull(info);
+
+        vertexInputInfos.compute(
+                jobVertexId,
+                (ignored, inputInfos) -> {
+                    if (inputInfos == null) {
+                        inputInfos = new HashMap<>();
+                    }
+
+                    // Note that a job vertex can have 2 inputs with the same IntermediateDataSetID

Review Comment:
   Here is mainly to explain why we use `putIfAbsent`. I will add the "if a vertex has multiple job edges connecting to the same result, their distribution pattern must be the same and therefore the VertexInputInfo will be the same" to the doc of VertexInputInfoStore.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoComputationUtils.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Util to compute {@link VertexInputInfo}s for execution job vertex. */
+public class VertexInputInfoComputationUtils {
+
+    public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos(
+            ExecutionJobVertex ejv,
+            Function<IntermediateDataSetID, IntermediateResult> intermediateResultRetriever) {
+        checkState(ejv.isParallelismDecided());
+        final List<IntermediateResultInfo> intermediateResultInfos =
+                ejv.getJobVertex().getInputs().stream()
+                        .map(JobEdge::getSourceId)
+                        .map(intermediateResultRetriever)
+                        .map(IntermediateResultWrapper::new)
+                        .collect(Collectors.toList());
+        return computeVertexInputInfos(
+                ejv.getParallelism(), intermediateResultInfos, ejv.getGraph().isDynamic());
+    }
+
+    public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos(
+            int parallelism, List<IntermediateResultInfo> inputs, boolean isDynamicGraph) {
+
+        checkArgument(parallelism > 0);
+        final Map<IntermediateDataSetID, VertexInputInfo> vertexInputInfos = new LinkedHashMap<>();
+
+        for (IntermediateResultInfo input : inputs) {
+            int sourceParallelism = input.getNumPartitions();
+
+            if (input.isPointwise()) {
+                vertexInputInfos.putIfAbsent(
+                        input.getResultId(),
+                        computeVertexInputInfoForPointwise(
+                                sourceParallelism,
+                                parallelism,
+                                input::getNumSubpartitions,
+                                isDynamicGraph));
+            } else {
+                vertexInputInfos.putIfAbsent(
+                        input.getResultId(),
+                        computeVertexInputInfoForAllToAll(
+                                sourceParallelism,
+                                parallelism,
+                                input::getNumSubpartitions,
+                                isDynamicGraph,
+                                input.isBroadcast()));
+            }
+        }
+
+        return vertexInputInfos;
+    }
+
+    /**
+     * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#POINTWISE} edge. This
+     * computation algorithm will evenly distribute subpartitions to downstream subtasks according
+     * to the number of subpartitions. Different downstream subtasks consume roughly the same number
+     * of subpartitions.
+     *
+     * @param sourceCount the parallelism of upstream
+     * @param targetCount the parallelism of downstream
+     * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @return the computed {@link VertexInputInfo}
+     */
+    @VisibleForTesting
+    static VertexInputInfo computeVertexInputInfoForPointwise(
+            int sourceCount,
+            int targetCount,
+            Function<Integer, Integer> numOfSubpartitionsRetriever,
+            boolean isDynamicGraph) {
+
+        final List<TaskInputInfo> taskInputInfos = new ArrayList<>();
+
+        if (sourceCount >= targetCount) {
+            for (int index = 0; index < targetCount; index++) {
+
+                int start = index * sourceCount / targetCount;
+                int end = (index + 1) * sourceCount / targetCount;
+
+                PartitionIndexRange partitionRange = new PartitionIndexRange(start, end - 1);
+                SubpartitionIndexRange subpartitionRange =
+                        computeConsumedSubpartitionRange(
+                                index,
+                                1,
+                                numOfSubpartitionsRetriever.apply(start),
+                                isDynamicGraph,
+                                false);
+                taskInputInfos.add(new TaskInputInfo(index, partitionRange, subpartitionRange));
+            }
+        } else {
+            for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) {
+
+                int start = (partitionNum * targetCount + sourceCount - 1) / sourceCount;
+                int end = ((partitionNum + 1) * targetCount + sourceCount - 1) / sourceCount;
+                int numConsumers = end - start;
+
+                for (int i = start; i < end; i++) {
+                    PartitionIndexRange partitionRange =
+                            new PartitionIndexRange(partitionNum, partitionNum);
+                    SubpartitionIndexRange subpartitionRange =
+                            computeConsumedSubpartitionRange(
+                                    i,
+                                    numConsumers,
+                                    numOfSubpartitionsRetriever.apply(partitionNum),
+                                    isDynamicGraph,
+                                    false);
+                    taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange));
+                }
+            }
+        }
+        return new VertexInputInfo(taskInputInfos);
+    }
+
+    /**
+     * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#ALL_TO_ALL} edge. This
+     * computation algorithm will evenly distribute subpartitions to downstream subtasks according
+     * to the number of subpartitions. Different downstream subtasks consume roughly the same number
+     * of subpartitions.
+     *
+     * @param sourceCount the parallelism of upstream
+     * @param targetCount the parallelism of downstream
+     * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @param isBroadcast whether the edge is broadcast
+     * @return the computed {@link VertexInputInfo}
+     */
+    @VisibleForTesting
+    static VertexInputInfo computeVertexInputInfoForAllToAll(
+            int sourceCount,
+            int targetCount,
+            Function<Integer, Integer> numOfSubpartitionsRetriever,
+            boolean isDynamicGraph,
+            boolean isBroadcast) {
+        final List<TaskInputInfo> taskInputInfos = new ArrayList<>();
+        for (int i = 0; i < targetCount; ++i) {
+            PartitionIndexRange partitionRange = new PartitionIndexRange(0, sourceCount - 1);
+            SubpartitionIndexRange subpartitionRange =
+                    computeConsumedSubpartitionRange(
+                            i,
+                            targetCount,
+                            numOfSubpartitionsRetriever.apply(0),
+                            isDynamicGraph,
+                            isBroadcast);
+            taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange));
+        }
+        return new VertexInputInfo(taskInputInfos);
+    }
+
+    /**
+     * Compute the consumed subpartition range for a subtask. This computation algorithm will evenly
+     * distribute subpartitions to downstream subtasks according to the number of subpartitions.
+     * Different downstream subtasks consume roughly the same number of subpartitions.
+     *
+     * @param consumerSubtaskIndex the subtask index
+     * @param numConsumers the total number of consumers
+     * @param numSubpartitions the total number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @param isBroadcast whether the edge is broadcast
+     * @return the computed subpartition range
+     */
+    @VisibleForTesting
+    static SubpartitionIndexRange computeConsumedSubpartitionRange(
+            int consumerSubtaskIndex,
+            int numConsumers,
+            int numSubpartitions,
+            boolean isDynamicGraph,
+            boolean isBroadcast) {
+        int consumerIndex = consumerSubtaskIndex % numConsumers;
+        if (!isDynamicGraph) {
+            return new SubpartitionIndexRange(consumerIndex, consumerIndex);
+        } else {
+            if (isBroadcast) {
+                // broadcast results have only one subpartition, and be consumed multiple times.
+                checkArgument(numSubpartitions == 1);
+                return new SubpartitionIndexRange(0, 0);
+            } else {
+                checkArgument(consumerIndex < numConsumers);
+                checkArgument(numConsumers <= numSubpartitions);
+
+                int start = consumerIndex * numSubpartitions / numConsumers;
+                int nextStart = (consumerIndex + 1) * numSubpartitions / numConsumers;
+
+                return new SubpartitionIndexRange(start, nextStart - 1);
+            }
+        }
+    }
+
+    private static class IntermediateResultWrapper implements IntermediateResultInfo {
+        private final IntermediateResult intermediateResult;
+
+        IntermediateResultWrapper(IntermediateResult intermediateResult) {
+            this.intermediateResult = checkNotNull(intermediateResult);
+        }
+
+        @Override
+        public IntermediateDataSetID getResultId() {
+            return intermediateResult.getId();
+        }
+
+        @Override
+        public boolean isBroadcast() {
+            return intermediateResult.isBroadcast();
+        }
+
+        @Override
+        public boolean isPointwise() {
+            return intermediateResult.getConsumingDistributionPattern()
+                    == DistributionPattern.POINTWISE;
+        }
+
+        @Override
+        public int getNumPartitions() {
+            return intermediateResult.getNumberOfAssignedPartitions();
+        }
+
+        @Override
+        public int getNumSubpartitions(int partitionIndex) {
+            boolean isDynamicGraph = intermediateResult.getProducer().getGraph().isDynamic();
+            // Note that for non-dynamic graph, the num of subpartition has not been decided at

Review Comment:
   This comment should indeed be `non-dynamic`. Currently for non-dynamic graph, the number of subpartitions is decided by the connection information in the `EdgeManager`. At this point, this vertex has not been connected to predecessors(ExecutionJobVertex#connectToPredecessors has not be called), so the edge manager does not have the connection information, so we use UNKONWN here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org