You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/26 13:37:37 UTC

[GitHub] [flink] wanglijie95 commented on a diff in pull request #21162: [FLINK-29665][runtime] Support flexible subpartion range division

wanglijie95 commented on code in PR #21162:
URL: https://github.com/apache/flink/pull/21162#discussion_r1057242727


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoStore.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A store contains all the {@link VertexInputInfo}s. */
+public class VertexInputInfoStore {
+
+    private final Map<JobVertexID, Map<IntermediateDataSetID, VertexInputInfo>> vertexInputInfos =
+            new HashMap<>();
+
+    /**
+     * Put a {@link VertexInputInfo}.
+     *
+     * @param jobVertexId the job vertex id
+     * @param resultId the intermediate result id
+     * @param info the {@link VertexInputInfo} to put
+     */
+    public void put(JobVertexID jobVertexId, IntermediateDataSetID resultId, VertexInputInfo info) {
+        checkNotNull(jobVertexId);
+        checkNotNull(resultId);
+        checkNotNull(info);
+
+        vertexInputInfos.compute(
+                jobVertexId,
+                (ignored, inputInfos) -> {
+                    if (inputInfos == null) {
+                        inputInfos = new HashMap<>();
+                    }
+
+                    // Note that a job vertex can have 2 inputs with the same IntermediateDataSetID

Review Comment:
   Here is mainly to explain why we use `putIfAbsent`. I will add the "if a vertex has multiple job edges connecting to the same result, their distribution pattern must be the same and therefore the VertexInputInfo will be the same" to the doc of VertexInputInfoStore.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexInputInfoComputationUtils.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Util to compute {@link VertexInputInfo}s for execution job vertex. */
+public class VertexInputInfoComputationUtils {
+
+    public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos(
+            ExecutionJobVertex ejv,
+            Function<IntermediateDataSetID, IntermediateResult> intermediateResultRetriever) {
+        checkState(ejv.isParallelismDecided());
+        final List<IntermediateResultInfo> intermediateResultInfos =
+                ejv.getJobVertex().getInputs().stream()
+                        .map(JobEdge::getSourceId)
+                        .map(intermediateResultRetriever)
+                        .map(IntermediateResultWrapper::new)
+                        .collect(Collectors.toList());
+        return computeVertexInputInfos(
+                ejv.getParallelism(), intermediateResultInfos, ejv.getGraph().isDynamic());
+    }
+
+    public static Map<IntermediateDataSetID, VertexInputInfo> computeVertexInputInfos(
+            int parallelism, List<IntermediateResultInfo> inputs, boolean isDynamicGraph) {
+
+        checkArgument(parallelism > 0);
+        final Map<IntermediateDataSetID, VertexInputInfo> vertexInputInfos = new LinkedHashMap<>();
+
+        for (IntermediateResultInfo input : inputs) {
+            int sourceParallelism = input.getNumPartitions();
+
+            if (input.isPointwise()) {
+                vertexInputInfos.putIfAbsent(
+                        input.getResultId(),
+                        computeVertexInputInfoForPointwise(
+                                sourceParallelism,
+                                parallelism,
+                                input::getNumSubpartitions,
+                                isDynamicGraph));
+            } else {
+                vertexInputInfos.putIfAbsent(
+                        input.getResultId(),
+                        computeVertexInputInfoForAllToAll(
+                                sourceParallelism,
+                                parallelism,
+                                input::getNumSubpartitions,
+                                isDynamicGraph,
+                                input.isBroadcast()));
+            }
+        }
+
+        return vertexInputInfos;
+    }
+
+    /**
+     * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#POINTWISE} edge. This
+     * computation algorithm will evenly distribute subpartitions to downstream subtasks according
+     * to the number of subpartitions. Different downstream subtasks consume roughly the same number
+     * of subpartitions.
+     *
+     * @param sourceCount the parallelism of upstream
+     * @param targetCount the parallelism of downstream
+     * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @return the computed {@link VertexInputInfo}
+     */
+    @VisibleForTesting
+    static VertexInputInfo computeVertexInputInfoForPointwise(
+            int sourceCount,
+            int targetCount,
+            Function<Integer, Integer> numOfSubpartitionsRetriever,
+            boolean isDynamicGraph) {
+
+        final List<TaskInputInfo> taskInputInfos = new ArrayList<>();
+
+        if (sourceCount >= targetCount) {
+            for (int index = 0; index < targetCount; index++) {
+
+                int start = index * sourceCount / targetCount;
+                int end = (index + 1) * sourceCount / targetCount;
+
+                PartitionIndexRange partitionRange = new PartitionIndexRange(start, end - 1);
+                SubpartitionIndexRange subpartitionRange =
+                        computeConsumedSubpartitionRange(
+                                index,
+                                1,
+                                numOfSubpartitionsRetriever.apply(start),
+                                isDynamicGraph,
+                                false);
+                taskInputInfos.add(new TaskInputInfo(index, partitionRange, subpartitionRange));
+            }
+        } else {
+            for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) {
+
+                int start = (partitionNum * targetCount + sourceCount - 1) / sourceCount;
+                int end = ((partitionNum + 1) * targetCount + sourceCount - 1) / sourceCount;
+                int numConsumers = end - start;
+
+                for (int i = start; i < end; i++) {
+                    PartitionIndexRange partitionRange =
+                            new PartitionIndexRange(partitionNum, partitionNum);
+                    SubpartitionIndexRange subpartitionRange =
+                            computeConsumedSubpartitionRange(
+                                    i,
+                                    numConsumers,
+                                    numOfSubpartitionsRetriever.apply(partitionNum),
+                                    isDynamicGraph,
+                                    false);
+                    taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange));
+                }
+            }
+        }
+        return new VertexInputInfo(taskInputInfos);
+    }
+
+    /**
+     * Compute the {@link VertexInputInfo} for a {@link DistributionPattern#ALL_TO_ALL} edge. This
+     * computation algorithm will evenly distribute subpartitions to downstream subtasks according
+     * to the number of subpartitions. Different downstream subtasks consume roughly the same number
+     * of subpartitions.
+     *
+     * @param sourceCount the parallelism of upstream
+     * @param targetCount the parallelism of downstream
+     * @param numOfSubpartitionsRetriever a retriever to get the number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @param isBroadcast whether the edge is broadcast
+     * @return the computed {@link VertexInputInfo}
+     */
+    @VisibleForTesting
+    static VertexInputInfo computeVertexInputInfoForAllToAll(
+            int sourceCount,
+            int targetCount,
+            Function<Integer, Integer> numOfSubpartitionsRetriever,
+            boolean isDynamicGraph,
+            boolean isBroadcast) {
+        final List<TaskInputInfo> taskInputInfos = new ArrayList<>();
+        for (int i = 0; i < targetCount; ++i) {
+            PartitionIndexRange partitionRange = new PartitionIndexRange(0, sourceCount - 1);
+            SubpartitionIndexRange subpartitionRange =
+                    computeConsumedSubpartitionRange(
+                            i,
+                            targetCount,
+                            numOfSubpartitionsRetriever.apply(0),
+                            isDynamicGraph,
+                            isBroadcast);
+            taskInputInfos.add(new TaskInputInfo(i, partitionRange, subpartitionRange));
+        }
+        return new VertexInputInfo(taskInputInfos);
+    }
+
+    /**
+     * Compute the consumed subpartition range for a subtask. This computation algorithm will evenly
+     * distribute subpartitions to downstream subtasks according to the number of subpartitions.
+     * Different downstream subtasks consume roughly the same number of subpartitions.
+     *
+     * @param consumerSubtaskIndex the subtask index
+     * @param numConsumers the total number of consumers
+     * @param numSubpartitions the total number of subpartitions
+     * @param isDynamicGraph whether is dynamic graph
+     * @param isBroadcast whether the edge is broadcast
+     * @return the computed subpartition range
+     */
+    @VisibleForTesting
+    static SubpartitionIndexRange computeConsumedSubpartitionRange(
+            int consumerSubtaskIndex,
+            int numConsumers,
+            int numSubpartitions,
+            boolean isDynamicGraph,
+            boolean isBroadcast) {
+        int consumerIndex = consumerSubtaskIndex % numConsumers;
+        if (!isDynamicGraph) {
+            return new SubpartitionIndexRange(consumerIndex, consumerIndex);
+        } else {
+            if (isBroadcast) {
+                // broadcast results have only one subpartition, and be consumed multiple times.
+                checkArgument(numSubpartitions == 1);
+                return new SubpartitionIndexRange(0, 0);
+            } else {
+                checkArgument(consumerIndex < numConsumers);
+                checkArgument(numConsumers <= numSubpartitions);
+
+                int start = consumerIndex * numSubpartitions / numConsumers;
+                int nextStart = (consumerIndex + 1) * numSubpartitions / numConsumers;
+
+                return new SubpartitionIndexRange(start, nextStart - 1);
+            }
+        }
+    }
+
+    private static class IntermediateResultWrapper implements IntermediateResultInfo {
+        private final IntermediateResult intermediateResult;
+
+        IntermediateResultWrapper(IntermediateResult intermediateResult) {
+            this.intermediateResult = checkNotNull(intermediateResult);
+        }
+
+        @Override
+        public IntermediateDataSetID getResultId() {
+            return intermediateResult.getId();
+        }
+
+        @Override
+        public boolean isBroadcast() {
+            return intermediateResult.isBroadcast();
+        }
+
+        @Override
+        public boolean isPointwise() {
+            return intermediateResult.getConsumingDistributionPattern()
+                    == DistributionPattern.POINTWISE;
+        }
+
+        @Override
+        public int getNumPartitions() {
+            return intermediateResult.getNumberOfAssignedPartitions();
+        }
+
+        @Override
+        public int getNumSubpartitions(int partitionIndex) {
+            boolean isDynamicGraph = intermediateResult.getProducer().getGraph().isDynamic();
+            // Note that for non-dynamic graph, the num of subpartition has not been decided at

Review Comment:
   This comment should indeed be `non-dynamic`. Currently for non-dynamic graph, the number of subpartitions is decided by the connection information in the `EdgeManager`. At this point, this vertex has not been connected to predecessors(ExecutionJobVertex#connectToPredecessors has not be called), so the edge manager does not have the connection information, so we use UNKONWN here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org