You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/14 14:56:45 UTC

[flink] branch release-1.16 updated (00855438f2e -> eefeb6a5391)

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 00855438f2e [FLINK-28928][tests] Add E2E test for hybrid shuffle mode.
     new 453d820879d [hotfix] make schedulingDownStreamTasksInBatchJobBenchmark more in line with the real scene.
     new ed644723f96 [hotfix] Let scheduledRegions using the set backed up by IdentityHashMap
     new 9afe61166da [FLINK-29101] Filter the the execution vertex finished event for non-finished ConsumedPartitionGroup
     new 665e8a10a4b [FLINK-29101] Use loop instead of recursive algorithm for maybeScheduleRegions to avoid stackOverFlow.
     new eefeb6a5391 [FLINK-29101] ignore non canBePipelined consumedPartitionGroup in getAllSchedulableRegions.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../PipelinedRegionSchedulingStrategy.java         | 136 +++++++++++++--------
 ...hedulingDownstreamTasksInBatchJobBenchmark.java |  28 +++--
 2 files changed, 102 insertions(+), 62 deletions(-)


[flink] 02/05: [hotfix] Let scheduledRegions using the set backed up by IdentityHashMap

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ed644723f965187aac5fa2228cc1ca70b20c803c
Author: Weijie Guo <re...@163.com>
AuthorDate: Fri Sep 2 17:59:49 2022 +0800

    [hotfix] Let scheduledRegions using the set backed up by IdentityHashMap
---
 .../runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
index 134f1eda5d3..1b4bf431648 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
@@ -61,7 +61,8 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
     private final Set<ConsumedPartitionGroup> crossRegionConsumedPartitionGroups =
             Collections.newSetFromMap(new IdentityHashMap<>());
 
-    private final Set<SchedulingPipelinedRegion> scheduledRegions = new HashSet<>();
+    private final Set<SchedulingPipelinedRegion> scheduledRegions =
+            Collections.newSetFromMap(new IdentityHashMap<>());
 
     public PipelinedRegionSchedulingStrategy(
             final SchedulerOperations schedulerOperations,


[flink] 05/05: [FLINK-29101] ignore non canBePipelined consumedPartitionGroup in getAllSchedulableRegions.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eefeb6a539186e2bfe49f72d346f00474229547c
Author: Weijie Guo <re...@163.com>
AuthorDate: Mon Sep 5 17:49:04 2022 +0800

    [FLINK-29101] ignore non canBePipelined consumedPartitionGroup in getAllSchedulableRegions.
    
    This closes #20739
---
 .../scheduler/strategy/PipelinedRegionSchedulingStrategy.java        | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
index a5b0482dbb4..3ccce83a5fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
@@ -248,6 +248,11 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
                         .getOrDefault(currentRegion, Collections.emptySet())
                         .forEach(
                                 (producedPartitionGroup) -> {
+                                    if (!producedPartitionGroup
+                                            .getResultPartitionType()
+                                            .canBePipelinedConsumed()) {
+                                        return;
+                                    }
                                     // If this group has been visited, there is no need
                                     // to repeat the determination.
                                     if (visitedConsumedPartitionGroups.contains(


[flink] 03/05: [FLINK-29101] Filter the the execution vertex finished event for non-finished ConsumedPartitionGroup

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9afe61166dafa5aad0812b0839e41590751cf4ca
Author: Weijie Guo <re...@163.com>
AuthorDate: Fri Sep 2 17:56:02 2022 +0800

    [FLINK-29101] Filter the the execution vertex finished event for non-finished ConsumedPartitionGroup
---
 .../scheduler/strategy/PipelinedRegionSchedulingStrategy.java  | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
index 1b4bf431648..df463636810 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
@@ -161,10 +161,15 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
         }
     }
 
-    private Set<SchedulingPipelinedRegion> getDownstreamRegionsOfVertex(
+    private Set<SchedulingPipelinedRegion> getBlockingDownstreamRegionsOfVertex(
             SchedulingExecutionVertex executionVertex) {
         return IterableUtils.toStream(executionVertex.getProducedResults())
+                .filter(partition -> !partition.getResultType().canBePipelinedConsumed())
                 .flatMap(partition -> partition.getConsumedPartitionGroups().stream())
+                .filter(
+                        group ->
+                                crossRegionConsumedPartitionGroups.contains(group)
+                                        || group.areAllPartitionsFinished())
                 .flatMap(
                         partitionGroup ->
                                 partitionGroupConsumerRegions
@@ -208,7 +213,8 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
             final ExecutionVertexID executionVertexId, final ExecutionState executionState) {
         if (executionState == ExecutionState.FINISHED) {
             maybeScheduleRegions(
-                    getDownstreamRegionsOfVertex(schedulingTopology.getVertex(executionVertexId)));
+                    getBlockingDownstreamRegionsOfVertex(
+                            schedulingTopology.getVertex(executionVertexId)));
         }
     }
 


[flink] 04/05: [FLINK-29101] Use loop instead of recursive algorithm for maybeScheduleRegions to avoid stackOverFlow.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 665e8a10a4b7cd7dec1406628b1ac1a7295bbf92
Author: Weijie Guo <re...@163.com>
AuthorDate: Mon Sep 5 11:13:58 2022 +0800

    [FLINK-29101] Use loop instead of recursive algorithm for maybeScheduleRegions to avoid stackOverFlow.
---
 .../PipelinedRegionSchedulingStrategy.java         | 118 +++++++++++++--------
 1 file changed, 72 insertions(+), 46 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
index df463636810..a5b0482dbb4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
@@ -53,9 +53,9 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
     private final Map<SchedulingPipelinedRegion, List<ExecutionVertexID>> regionVerticesSorted =
             new IdentityHashMap<>();
 
-    /** All ConsumedPartitionGroups of one schedulingPipelinedRegion. */
+    /** All produced partition groups of one schedulingPipelinedRegion. */
     private final Map<SchedulingPipelinedRegion, Set<ConsumedPartitionGroup>>
-            consumedPartitionGroupsOfRegion = new IdentityHashMap<>();
+            producedPartitionGroupsOfRegion = new IdentityHashMap<>();
 
     /** The ConsumedPartitionGroups which are produced by multiple regions. */
     private final Set<ConsumedPartitionGroup> crossRegionConsumedPartitionGroups =
@@ -80,7 +80,7 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
 
         initPartitionGroupConsumerRegions();
 
-        initConsumedPartitionGroupsOfRegion();
+        initProducedPartitionGroupsOfRegion();
 
         for (SchedulingExecutionVertex vertex : schedulingTopology.getVertices()) {
             final SchedulingPipelinedRegion region =
@@ -91,18 +91,18 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
         }
     }
 
-    private void initConsumedPartitionGroupsOfRegion() {
+    private void initProducedPartitionGroupsOfRegion() {
         for (SchedulingPipelinedRegion region : schedulingTopology.getAllPipelinedRegions()) {
-            Set<ConsumedPartitionGroup> consumedPartitionGroupsSetOfRegion = new HashSet<>();
+            Set<ConsumedPartitionGroup> producedPartitionGroupsSetOfRegion = new HashSet<>();
             for (SchedulingExecutionVertex executionVertex : region.getVertices()) {
-                consumedPartitionGroupsSetOfRegion.addAll(
+                producedPartitionGroupsSetOfRegion.addAll(
                         IterableUtils.toStream(executionVertex.getProducedResults())
                                 .flatMap(
                                         partition ->
                                                 partition.getConsumedPartitionGroups().stream())
                                 .collect(Collectors.toSet()));
             }
-            consumedPartitionGroupsOfRegion.put(region, consumedPartitionGroupsSetOfRegion);
+            producedPartitionGroupsOfRegion.put(region, producedPartitionGroupsSetOfRegion);
         }
     }
 
@@ -222,64 +222,83 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
     public void onPartitionConsumable(final IntermediateResultPartitionID resultPartitionId) {}
 
     private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
-        final List<SchedulingPipelinedRegion> regionsSorted =
-                SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(
-                        schedulingTopology, regions);
+        final Set<SchedulingPipelinedRegion> regionsToSchedule = new HashSet<>();
+        Set<SchedulingPipelinedRegion> nextRegions = regions;
+        while (!nextRegions.isEmpty()) {
+            nextRegions = addSchedulableAndGetNextRegions(nextRegions, regionsToSchedule);
+        }
+        // schedule regions in topological order.
+        SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(
+                        schedulingTopology, regionsToSchedule)
+                .forEach(this::scheduleRegion);
+    }
 
+    private Set<SchedulingPipelinedRegion> addSchedulableAndGetNextRegions(
+            Set<SchedulingPipelinedRegion> currentRegions,
+            Set<SchedulingPipelinedRegion> regionsToSchedule) {
+        Set<SchedulingPipelinedRegion> nextRegions = new HashSet<>();
+        // cache consumedPartitionGroup's consumable status to avoid compute repeatedly.
         final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new HashMap<>();
-        final Set<SchedulingPipelinedRegion> downstreamSchedulableRegions = new HashSet<>();
-        for (SchedulingPipelinedRegion region : regionsSorted) {
-            if (maybeScheduleRegion(region, consumableStatusCache)) {
-                downstreamSchedulableRegions.addAll(
-                        consumedPartitionGroupsOfRegion.getOrDefault(region, Collections.emptySet())
-                                .stream()
-                                .flatMap(
-                                        consumedPartitionGroups ->
-                                                partitionGroupConsumerRegions
-                                                        .getOrDefault(
-                                                                consumedPartitionGroups,
-                                                                Collections.emptySet())
-                                                        .stream())
-                                .collect(Collectors.toSet()));
+        final Set<ConsumedPartitionGroup> visitedConsumedPartitionGroups = new HashSet<>();
+
+        for (SchedulingPipelinedRegion currentRegion : currentRegions) {
+            if (isRegionSchedulable(currentRegion, consumableStatusCache, regionsToSchedule)) {
+                regionsToSchedule.add(currentRegion);
+                producedPartitionGroupsOfRegion
+                        .getOrDefault(currentRegion, Collections.emptySet())
+                        .forEach(
+                                (producedPartitionGroup) -> {
+                                    // If this group has been visited, there is no need
+                                    // to repeat the determination.
+                                    if (visitedConsumedPartitionGroups.contains(
+                                            producedPartitionGroup)) {
+                                        return;
+                                    }
+                                    visitedConsumedPartitionGroups.add(producedPartitionGroup);
+                                    nextRegions.addAll(
+                                            partitionGroupConsumerRegions.getOrDefault(
+                                                    producedPartitionGroup,
+                                                    Collections.emptySet()));
+                                });
             }
         }
-
-        if (!downstreamSchedulableRegions.isEmpty()) {
-            maybeScheduleRegions(downstreamSchedulableRegions);
-        }
+        return nextRegions;
     }
 
-    private boolean maybeScheduleRegion(
+    private boolean isRegionSchedulable(
             final SchedulingPipelinedRegion region,
-            final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
-        if (scheduledRegions.contains(region)
-                || !areRegionInputsAllConsumable(region, consumableStatusCache)) {
-            return false;
-        }
+            final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache,
+            final Set<SchedulingPipelinedRegion> regionToSchedule) {
+        return !regionToSchedule.contains(region)
+                && !scheduledRegions.contains(region)
+                && areRegionInputsAllConsumable(region, consumableStatusCache, regionToSchedule);
+    }
 
+    private void scheduleRegion(final SchedulingPipelinedRegion region) {
         checkState(
                 areRegionVerticesAllInCreatedState(region),
                 "BUG: trying to schedule a region which is not in CREATED state");
-
-        schedulerOperations.allocateSlotsAndDeploy(regionVerticesSorted.get(region));
         scheduledRegions.add(region);
-        return true;
+        schedulerOperations.allocateSlotsAndDeploy(regionVerticesSorted.get(region));
     }
 
     private boolean areRegionInputsAllConsumable(
             final SchedulingPipelinedRegion region,
-            final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
+            final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache,
+            final Set<SchedulingPipelinedRegion> regionToSchedule) {
         for (ConsumedPartitionGroup consumedPartitionGroup :
                 region.getAllNonPipelinedConsumedPartitionGroups()) {
             if (crossRegionConsumedPartitionGroups.contains(consumedPartitionGroup)) {
                 if (!isDownstreamOfCrossRegionConsumedPartitionSchedulable(
-                        consumedPartitionGroup, region)) {
+                        consumedPartitionGroup, region, regionToSchedule)) {
                     return false;
                 }
             } else if (isExternalConsumedPartitionGroup(consumedPartitionGroup, region)) {
                 if (!consumableStatusCache.computeIfAbsent(
                         consumedPartitionGroup,
-                        this::isDownstreamConsumedPartitionGroupSchedulable)) {
+                        (group) ->
+                                isDownstreamConsumedPartitionGroupSchedulable(
+                                        group, regionToSchedule))) {
                     return false;
                 }
             }
@@ -288,10 +307,13 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
     }
 
     private boolean isDownstreamConsumedPartitionGroupSchedulable(
-            final ConsumedPartitionGroup consumedPartitionGroup) {
+            final ConsumedPartitionGroup consumedPartitionGroup,
+            final Set<SchedulingPipelinedRegion> regionToSchedule) {
         if (consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) {
             for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
-                if (!scheduledRegions.contains(getProducerRegion(partitionId))) {
+                SchedulingPipelinedRegion producerRegion = getProducerRegion(partitionId);
+                if (!scheduledRegions.contains(producerRegion)
+                        && !regionToSchedule.contains(producerRegion)) {
                     return false;
                 }
             }
@@ -308,12 +330,16 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
 
     private boolean isDownstreamOfCrossRegionConsumedPartitionSchedulable(
             final ConsumedPartitionGroup consumedPartitionGroup,
-            final SchedulingPipelinedRegion pipelinedRegion) {
+            final SchedulingPipelinedRegion pipelinedRegion,
+            final Set<SchedulingPipelinedRegion> regionToSchedule) {
         if (consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) {
             for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
-                if (isExternalConsumedPartition(partitionId, pipelinedRegion)
-                        && !scheduledRegions.contains(getProducerRegion(partitionId))) {
-                    return false;
+                if (isExternalConsumedPartition(partitionId, pipelinedRegion)) {
+                    SchedulingPipelinedRegion producerRegion = getProducerRegion(partitionId);
+                    if (!regionToSchedule.contains(producerRegion)
+                            && !scheduledRegions.contains(producerRegion)) {
+                        return false;
+                    }
                 }
             }
         } else {


[flink] 01/05: [hotfix] make schedulingDownStreamTasksInBatchJobBenchmark more in line with the real scene.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 453d820879d488aa4098a5fd674fd6dd2bbfbec5
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Sep 14 15:15:30 2022 +0800

    [hotfix] make schedulingDownStreamTasksInBatchJobBenchmark more in line with the real scene.
---
 ...hedulingDownstreamTasksInBatchJobBenchmark.java | 28 ++++++++++++----------
 1 file changed, 15 insertions(+), 13 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmark.java
index 29e17ce0d1d..b4e273441b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmark.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.scheduler.benchmark.scheduling;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
 
 /**
@@ -29,10 +28,10 @@ import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStra
  * PipelinedRegionSchedulingStrategy#onExecutionStateChange}.
  */
 public class SchedulingDownstreamTasksInBatchJobBenchmark extends SchedulingBenchmarkBase {
-
-    private ExecutionVertexID executionVertexID;
     private PipelinedRegionSchedulingStrategy schedulingStrategy;
 
+    private int parallelism;
+
     @Override
     public void setup(JobConfiguration jobConfiguration) throws Exception {
         super.setup(jobConfiguration);
@@ -40,18 +39,21 @@ public class SchedulingDownstreamTasksInBatchJobBenchmark extends SchedulingBenc
         schedulingStrategy =
                 new PipelinedRegionSchedulingStrategy(schedulerOperations, schedulingTopology);
 
-        executionVertexID =
-                executionGraph
-                        .getJobVertex(jobVertices.get(0).getID())
-                        .getTaskVertices()[0]
-                        .getID();
-        for (ExecutionVertex vertex :
-                executionGraph.getJobVertex(jobVertices.get(0).getID()).getTaskVertices()) {
-            vertex.finishAllBlockingPartitions();
-        }
+        parallelism = jobConfiguration.getParallelism();
     }
 
     public void schedulingDownstreamTasks() {
-        schedulingStrategy.onExecutionStateChange(executionVertexID, ExecutionState.FINISHED);
+        for (int i = 0; i < parallelism - 1; i++) {
+            ExecutionVertex taskVertex =
+                    executionGraph.getJobVertex(jobVertices.get(0).getID()).getTaskVertices()[i];
+            taskVertex.finishAllBlockingPartitions();
+
+            schedulingStrategy.onExecutionStateChange(taskVertex.getID(), ExecutionState.FINISHED);
+        }
+        ExecutionVertex lastVertex =
+                executionGraph.getJobVertex(jobVertices.get(0).getID())
+                        .getTaskVertices()[parallelism - 1];
+        lastVertex.finishAllBlockingPartitions();
+        schedulingStrategy.onExecutionStateChange(lastVertex.getID(), ExecutionState.FINISHED);
     }
 }