You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/14 11:28:18 UTC

[GitHub] [flink] zhuzhurk commented on a diff in pull request #21111: [FLINK-29664][runtime] Collect subpartition sizes of blocking result partitions

zhuzhurk commented on code in PR #21111:
URL: https://github.com/apache/flink/pull/21111#discussion_r1048325616


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/PointwiseBlockingResultInfo.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Information of Pointwise result. */
+public class PointwiseBlockingResultInfo extends AbstractBlockingResultInfo {
+    PointwiseBlockingResultInfo(
+            IntermediateDataSetID resultId, int numOfPartitions, int numOfSubpartitions) {
+        super(resultId, numOfPartitions, numOfSubpartitions);
+    }
+
+    @Override
+    public boolean isBroadcast() {
+        return false;
+    }
+
+    @Override
+    public boolean isPointwise() {
+        return true;
+    }
+
+    @Override
+    public long getNumBytesProduced() {
+        checkState(
+                subpartitionBytesByPartitionIndex.size() == numOfPartitions,
+                "Not all partition infos is ready");

Review Comment:
   is -> are



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AllToAllBlockingResultInfo.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Information of All-To-All result. */
+public class AllToAllBlockingResultInfo extends AbstractBlockingResultInfo {
+
+    private final boolean isBroadcast;
+
+    /**
+     * Aggregated subpartition bytes, which aggregates the subpartition bytes with the same
+     * subpartition index in different partitions. Note that We can aggregate them because they will
+     * be consumed by the same downstream task.
+     */
+    @Nullable private List<Long> aggregatedSubpartitionBytes;
+
+    AllToAllBlockingResultInfo(
+            IntermediateDataSetID resultId,
+            int numOfPartitions,
+            int numOfSubpartitions,
+            boolean isBroadcast) {
+        super(resultId, numOfPartitions, numOfSubpartitions);
+        this.isBroadcast = isBroadcast;
+    }
+
+    @Override
+    public boolean isBroadcast() {
+        return isBroadcast;
+    }
+
+    @Override
+    public boolean isPointwise() {
+        return false;
+    }
+
+    @Override
+    public long getNumBytesProduced() {
+        checkState(aggregatedSubpartitionBytes != null, "Not all partition infos is ready");
+        if (isBroadcast) {
+            return aggregatedSubpartitionBytes.get(0);
+        } else {
+            return aggregatedSubpartitionBytes.stream().reduce(0L, Long::sum);
+        }
+    }
+
+    @Override
+    public void recordPartitionInfo(int partitionIndex, ResultPartitionBytes partitionBytes) {
+        // Once all partitions are finished, we can convert the subpartition bytes to aggregated
+        // value to reduce the space usage, because the distribution of source splits does not
+        // affect the distribution of data consumed by downstream tasks of ALL_TO_ALL edges(Hashing
+        // or Rebalancing, we do not consider rare cases such as custom partitions here).
+        if (aggregatedSubpartitionBytes == null) {
+            super.recordPartitionInfo(partitionIndex, partitionBytes);
+
+            if (subpartitionBytesByPartitionIndex.size() == numOfPartitions) {
+                long[] aggregatedBytes = new long[numOfSubpartitions];
+                subpartitionBytesByPartitionIndex
+                        .values()
+                        .forEach(
+                                subpartitionBytes -> {
+                                    checkState(subpartitionBytes.length == numOfSubpartitions);
+                                    for (int i = 0; i < subpartitionBytes.length; ++i) {
+                                        aggregatedBytes[i] += subpartitionBytes[i];
+                                    }
+                                });
+                this.aggregatedSubpartitionBytes =
+                        Arrays.stream(aggregatedBytes).boxed().collect(Collectors.toList());
+                this.subpartitionBytesByPartitionIndex.clear();
+            }
+        }
+    }
+
+    @Override
+    public void resetPartitionInfo(int partitionIndex) {
+        if (aggregatedSubpartitionBytes == null) {
+            super.resetPartitionInfo(partitionIndex);
+        }
+    }
+
+    public List<Long> getAggregatedSubpartitionBytes() {
+        checkState(aggregatedSubpartitionBytes != null, "Not all partition infos is ready");

Review Comment:
   is -> are



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java:
##########
@@ -117,25 +139,139 @@ void testDecideParallelismForForwardTarget() throws Exception {
         // trigger source2 finished.
         transitionExecutionsState(scheduler, ExecutionState.FINISHED, source2);
         assertThat(sinkExecutionJobVertex.getParallelism()).isEqualTo(SOURCE_PARALLELISM_1);
+    }
 
-        // check that the jobGraph is updated
-        assertThat(sink.getParallelism()).isEqualTo(SOURCE_PARALLELISM_1);

Review Comment:
   Why is this check removed?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AllToAllBlockingResultInfo.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Information of All-To-All result. */
+public class AllToAllBlockingResultInfo extends AbstractBlockingResultInfo {
+
+    private final boolean isBroadcast;
+
+    /**
+     * Aggregated subpartition bytes, which aggregates the subpartition bytes with the same
+     * subpartition index in different partitions. Note that We can aggregate them because they will
+     * be consumed by the same downstream task.
+     */
+    @Nullable private List<Long> aggregatedSubpartitionBytes;
+
+    AllToAllBlockingResultInfo(
+            IntermediateDataSetID resultId,
+            int numOfPartitions,
+            int numOfSubpartitions,
+            boolean isBroadcast) {
+        super(resultId, numOfPartitions, numOfSubpartitions);
+        this.isBroadcast = isBroadcast;
+    }
+
+    @Override
+    public boolean isBroadcast() {
+        return isBroadcast;
+    }
+
+    @Override
+    public boolean isPointwise() {
+        return false;
+    }
+
+    @Override
+    public long getNumBytesProduced() {
+        checkState(aggregatedSubpartitionBytes != null, "Not all partition infos is ready");

Review Comment:
   is -> are



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -150,10 +164,62 @@ protected void startSchedulingInternal() {
     }
 
     @Override
-    protected void onTaskFinished(final Execution execution) {
+    protected void onTaskFinished(
+            final Execution execution, final TaskExecutionStateTransition taskExecutionState) {
+        Optional.ofNullable(taskExecutionState.getIOMetrics())

Review Comment:
   Is the `ioMetrics` possible to be be null when a task FINISHES?If not, we should check it's non-null in ahead.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -161,44 +164,62 @@ protected void startSchedulingInternal() {
     }
 
     @Override
-    public boolean updateTaskExecutionState(final TaskExecutionStateTransition taskExecutionState) {
-        if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED
-                && taskExecutionState.getIOMetrics() != null) {
-            updateResultPartitionBytesMetrics(taskExecutionState.getIOMetrics());
-        }
-        return super.updateTaskExecutionState(taskExecutionState);
-    }
-
-    @Override
-    protected void onTaskFinished(final Execution execution) {
+    protected void onTaskFinished(
+            final Execution execution, final TaskExecutionStateTransition taskExecutionState) {

Review Comment:
   It's better to only pass in the `ioMetrics` instead of the whole `taskExecutionState`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org