You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/12/22 06:09:41 UTC

[flink] branch master updated (b9fb8b8a2d5 -> 4ea67f63eb1)

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from b9fb8b8a2d5 [FLINK-30357][doc] Wrong link in connector/jdbc doc.
     new b97417f294f [FLINK-30188] Change result partition consumable to all data produced.
     new 4ea67f63eb1 [FLINK-30188][coordination] Set partition finished state in ConsumedPartitionGroup for dynamic graph correctly.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../TaskDeploymentDescriptorFactory.java           |  17 ++-
 .../executiongraph/EdgeManagerBuildUtil.java       |  20 +++
 .../IntermediateResultPartition.java               |  20 ++-
 .../adapter/DefaultExecutionTopology.java          |   5 +-
 .../strategy/DefaultInputConsumableDecider.java    |   2 +-
 .../PipelinedRegionSchedulingStrategy.java         |   4 +-
 .../scheduler/strategy/ResultPartitionState.java   |   8 +-
 .../DefaultExecutionGraphConstructionTest.java     | 153 +++++++++++++++++++++
 .../IntermediateResultPartitionTest.java           |  78 ++++-------
 .../adapter/DefaultExecutionTopologyTest.java      |  20 +--
 .../DefaultInputConsumableDeciderTest.java         |   2 +-
 .../strategy/TestingSchedulingExecutionVertex.java |   2 +-
 .../strategy/TestingSchedulingResultPartition.java |   6 +-
 .../strategy/TestingSchedulingTopology.java        |   5 +-
 14 files changed, 248 insertions(+), 94 deletions(-)


[flink] 01/02: [FLINK-30188] Change result partition consumable to all data produced.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b97417f294f805135b4a80e9e5ec7e2e3d266731
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Dec 15 17:13:12 2022 +0800

    [FLINK-30188] Change result partition consumable to all data produced.
---
 .../TaskDeploymentDescriptorFactory.java           | 17 +++--
 .../IntermediateResultPartition.java               | 20 +++---
 .../adapter/DefaultExecutionTopology.java          |  5 +-
 .../strategy/DefaultInputConsumableDecider.java    |  2 +-
 .../PipelinedRegionSchedulingStrategy.java         |  4 +-
 .../scheduler/strategy/ResultPartitionState.java   |  8 +--
 .../IntermediateResultPartitionTest.java           | 78 ++++++++--------------
 .../adapter/DefaultExecutionTopologyTest.java      | 20 +++---
 .../DefaultInputConsumableDeciderTest.java         |  2 +-
 .../strategy/TestingSchedulingExecutionVertex.java |  2 +-
 .../strategy/TestingSchedulingResultPartition.java |  6 +-
 .../strategy/TestingSchedulingTopology.java        |  5 +-
 12 files changed, 75 insertions(+), 94 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
index ba10dd59de8..92752bdd107 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
@@ -353,7 +353,7 @@ public class TaskDeploymentDescriptorFactory {
         return getConsumedPartitionShuffleDescriptor(
                 consumedPartitionId,
                 consumedPartition.getResultType(),
-                consumedPartition.isConsumable(),
+                consumedPartition.hasDataAllProduced(),
                 producerState,
                 partitionDeploymentConstraint,
                 consumedPartitionDescriptor.orElse(null));
@@ -363,12 +363,12 @@ public class TaskDeploymentDescriptorFactory {
     static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(
             ResultPartitionID consumedPartitionId,
             ResultPartitionType resultPartitionType,
-            boolean isConsumable,
+            boolean hasAllDataProduced,
             ExecutionState producerState,
             PartitionLocationConstraint partitionDeploymentConstraint,
             @Nullable ResultPartitionDeploymentDescriptor consumedPartitionDescriptor) {
         // The producing task needs to be RUNNING or already FINISHED
-        if ((resultPartitionType.canBePipelinedConsumed() || isConsumable)
+        if ((resultPartitionType.canBePipelinedConsumed() || hasAllDataProduced)
                 && consumedPartitionDescriptor != null
                 && isProducerAvailable(producerState)) {
             // partition is already registered
@@ -385,14 +385,14 @@ public class TaskDeploymentDescriptorFactory {
         } else {
             // throw respective exceptions
             throw handleConsumedPartitionShuffleDescriptorErrors(
-                    consumedPartitionId, resultPartitionType, isConsumable, producerState);
+                    consumedPartitionId, resultPartitionType, hasAllDataProduced, producerState);
         }
     }
 
     private static RuntimeException handleConsumedPartitionShuffleDescriptorErrors(
             ResultPartitionID consumedPartitionId,
             ResultPartitionType resultPartitionType,
-            boolean isConsumable,
+            boolean hasAllDataProduced,
             ExecutionState producerState) {
         String msg;
         if (isProducerFailedOrCanceled(producerState)) {
@@ -405,8 +405,11 @@ public class TaskDeploymentDescriptorFactory {
             msg =
                     String.format(
                             "Trying to consume an input partition whose producer "
-                                    + "is not ready (result type: %s, partition consumable: %s, producer state: %s, partition id: %s).",
-                            resultPartitionType, isConsumable, producerState, consumedPartitionId);
+                                    + "is not ready (result type: %s, hasAllDataProduced: %s, producer state: %s, partition id: %s).",
+                            resultPartitionType,
+                            hasAllDataProduced,
+                            producerState,
+                            consumedPartitionId);
         }
         return new IllegalStateException(msg);
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index 9b9c176a3d9..97b8765a5e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -46,8 +46,8 @@ public class IntermediateResultPartition {
     /** Number of subpartitions. Initialized lazily and will not change once set. */
     private int numberOfSubpartitions = UNKNOWN;
 
-    /** Whether this partition has produced some data. */
-    private boolean hasDataProduced = false;
+    /** Whether this partition has produced all data. */
+    private boolean dataAllProduced = false;
 
     /**
      * Releasable {@link ConsumedPartitionGroup}s for this result partition. This result partition
@@ -166,16 +166,12 @@ public class IntermediateResultPartition {
         }
     }
 
-    public void markDataProduced() {
-        hasDataProduced = true;
-    }
-
-    public boolean isConsumable() {
-        return hasDataProduced;
+    public boolean hasDataAllProduced() {
+        return dataAllProduced;
     }
 
     void resetForNewExecution() {
-        if (!getResultType().canBePipelinedConsumed() && hasDataProduced) {
+        if (!getResultType().canBePipelinedConsumed() && dataAllProduced) {
             // A BLOCKING result partition with data produced means it is finished
             // Need to add the running producer count of the result on resetting it
             for (ConsumedPartitionGroup consumedPartitionGroup : getConsumedPartitionGroups()) {
@@ -183,7 +179,7 @@ public class IntermediateResultPartition {
             }
         }
         releasablePartitionGroups.clear();
-        hasDataProduced = false;
+        dataAllProduced = false;
         for (ConsumedPartitionGroup consumedPartitionGroup : getConsumedPartitionGroups()) {
             totalResult.clearCachedInformationForPartitionGroup(consumedPartitionGroup);
         }
@@ -205,12 +201,12 @@ public class IntermediateResultPartition {
         }
 
         // Sanity check to make sure a result partition cannot be marked as finished twice.
-        if (hasDataProduced) {
+        if (dataAllProduced) {
             throw new IllegalStateException(
                     "Tried to mark a finished result partition as finished.");
         }
 
-        hasDataProduced = true;
+        dataAllProduced = true;
 
         for (ConsumedPartitionGroup consumedPartitionGroup : getConsumedPartitionGroups()) {
             consumedPartitionGroup.partitionFinished();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
index f2fd573e613..0b12baf3ef5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
@@ -301,8 +301,9 @@ public class DefaultExecutionTopology implements SchedulingTopology {
                                                 irp.getIntermediateResult().getId(),
                                                 irp.getResultType(),
                                                 () ->
-                                                        irp.isConsumable()
-                                                                ? ResultPartitionState.CONSUMABLE
+                                                        irp.hasDataAllProduced()
+                                                                ? ResultPartitionState
+                                                                        .ALL_DATA_PRODUCED
                                                                 : ResultPartitionState.CREATED,
                                                 () ->
                                                         partitionConsumerVertexGroupsRetriever
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
index 1308568db7f..8f3aa6695fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
@@ -80,7 +80,7 @@ class DefaultInputConsumableDecider implements InputConsumableDecider {
         } else {
             for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
                 if (resultPartitionRetriever.apply(partitionId).getState()
-                        != ResultPartitionState.CONSUMABLE) {
+                        != ResultPartitionState.ALL_DATA_PRODUCED) {
                     return false;
                 }
             }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
index 3ccce83a5fd..c1fc02ef555 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
@@ -325,7 +325,7 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
         } else {
             for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
                 if (schedulingTopology.getResultPartition(partitionId).getState()
-                        != ResultPartitionState.CONSUMABLE) {
+                        != ResultPartitionState.ALL_DATA_PRODUCED) {
                     return false;
                 }
             }
@@ -351,7 +351,7 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
             for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
                 if (isExternalConsumedPartition(partitionId, pipelinedRegion)
                         && schedulingTopology.getResultPartition(partitionId).getState()
-                                != ResultPartitionState.CONSUMABLE) {
+                                != ResultPartitionState.ALL_DATA_PRODUCED) {
                     return false;
                 }
             }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ResultPartitionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ResultPartitionState.java
index c9904127267..3762e2dc4f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ResultPartitionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ResultPartitionState.java
@@ -24,10 +24,6 @@ public enum ResultPartitionState {
     /** Partition is just created or is just reset. */
     CREATED,
 
-    /**
-     * Partition is ready for consuming. For pipelined partition, this indicates it has data
-     * produced. For blocking partition, this indicates all result partitions in its parent result
-     * have finished.
-     */
-    CONSUMABLE
+    /** Partition has produced all data. */
+    ALL_DATA_PRODUCED
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
index aa906caf674..a7eba80440b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
@@ -55,28 +55,7 @@ public class IntermediateResultPartitionTest {
             TestingUtils.defaultExecutorExtension();
 
     @Test
-    void testPipelinedPartitionConsumable() throws Exception {
-        IntermediateResult result = createResult(ResultPartitionType.PIPELINED, 2);
-        IntermediateResultPartition partition1 = result.getPartitions()[0];
-        IntermediateResultPartition partition2 = result.getPartitions()[1];
-
-        // Not consumable on init
-        assertThat(partition1.isConsumable()).isFalse();
-        assertThat(partition2.isConsumable()).isFalse();
-
-        // Partition 1 consumable after data are produced
-        partition1.markDataProduced();
-        assertThat(partition1.isConsumable()).isTrue();
-        assertThat(partition2.isConsumable()).isFalse();
-
-        // Not consumable if failover happens
-        result.resetForNewExecution();
-        assertThat(partition1.isConsumable()).isFalse();
-        assertThat(partition2.isConsumable()).isFalse();
-    }
-
-    @Test
-    void testBlockingPartitionConsumable() throws Exception {
+    void testPartitionDataAllProduced() throws Exception {
         IntermediateResult result = createResult(ResultPartitionType.BLOCKING, 2);
         IntermediateResultPartition partition1 = result.getPartitions()[0];
         IntermediateResultPartition partition2 = result.getPartitions()[1];
@@ -84,27 +63,27 @@ public class IntermediateResultPartitionTest {
         ConsumedPartitionGroup consumedPartitionGroup =
                 partition1.getConsumedPartitionGroups().get(0);
 
-        // Not consumable on init
-        assertThat(partition1.isConsumable()).isFalse();
-        assertThat(partition2.isConsumable()).isFalse();
+        // Not all data produced on init
+        assertThat(partition1.hasDataAllProduced()).isFalse();
+        assertThat(partition2.hasDataAllProduced()).isFalse();
         assertThat(consumedPartitionGroup.areAllPartitionsFinished()).isFalse();
 
-        // Not consumable if only one partition is FINISHED
+        // Finished partition has produced all data
         partition1.markFinished();
-        assertThat(partition1.isConsumable()).isTrue();
-        assertThat(partition2.isConsumable()).isFalse();
+        assertThat(partition1.hasDataAllProduced()).isTrue();
+        assertThat(partition2.hasDataAllProduced()).isFalse();
         assertThat(consumedPartitionGroup.areAllPartitionsFinished()).isFalse();
 
-        // Consumable after all partitions are FINISHED
+        // Finished partition has produced all data
         partition2.markFinished();
-        assertThat(partition1.isConsumable()).isTrue();
-        assertThat(partition2.isConsumable()).isTrue();
+        assertThat(partition1.hasDataAllProduced()).isTrue();
+        assertThat(partition2.hasDataAllProduced()).isTrue();
         assertThat(consumedPartitionGroup.areAllPartitionsFinished()).isTrue();
 
-        // Not consumable if failover happens
+        // Not all data produced if failover happens
         result.resetForNewExecution();
-        assertThat(partition1.isConsumable()).isFalse();
-        assertThat(partition2.isConsumable()).isFalse();
+        assertThat(partition1.hasDataAllProduced()).isFalse();
+        assertThat(partition2.hasDataAllProduced()).isFalse();
         assertThat(consumedPartitionGroup.areAllPartitionsFinished()).isFalse();
     }
 
@@ -117,38 +96,39 @@ public class IntermediateResultPartitionTest {
         ConsumedPartitionGroup consumedPartitionGroup =
                 partition1.getConsumedPartitionGroups().get(0);
 
-        // Not consumable on init
-        assertThat(partition1.isConsumable()).isFalse();
-        assertThat(partition2.isConsumable()).isFalse();
+        // Not all data data produced on init
+        assertThat(partition1.hasDataAllProduced()).isFalse();
+        assertThat(partition2.hasDataAllProduced()).isFalse();
 
-        // Not consumable if partition1 is FINISHED
+        // Finished partition has produced all data
         partition1.markFinished();
         assertThat(consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isEqualTo(1);
-        assertThat(partition1.isConsumable()).isTrue();
-        assertThat(partition2.isConsumable()).isFalse();
+        assertThat(partition1.hasDataAllProduced()).isTrue();
+        assertThat(partition2.hasDataAllProduced()).isFalse();
         assertThat(consumedPartitionGroup.areAllPartitionsFinished()).isFalse();
 
-        // Reset the result and mark partition2 FINISHED, the result should still not be consumable
+        // Reset the result and mark partition2 FINISHED, partition1's hasDataAllProduced should be
+        // false, partition2's hasDataAllProduced should be true
         result.resetForNewExecution();
         assertThat(consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isEqualTo(2);
         partition2.markFinished();
         assertThat(consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isEqualTo(1);
-        assertThat(partition1.isConsumable()).isFalse();
-        assertThat(partition2.isConsumable()).isTrue();
+        assertThat(partition1.hasDataAllProduced()).isFalse();
+        assertThat(partition2.hasDataAllProduced()).isTrue();
         assertThat(consumedPartitionGroup.areAllPartitionsFinished()).isFalse();
 
-        // Consumable after all partitions are FINISHED
+        // All partition finished.
         partition1.markFinished();
         assertThat(consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isEqualTo(0);
-        assertThat(partition1.isConsumable()).isTrue();
-        assertThat(partition2.isConsumable()).isTrue();
+        assertThat(partition1.hasDataAllProduced()).isTrue();
+        assertThat(partition2.hasDataAllProduced()).isTrue();
         assertThat(consumedPartitionGroup.areAllPartitionsFinished()).isTrue();
 
-        // Not consumable again if failover happens
+        // Not all data produced and not all partition finished again if failover happens
         result.resetForNewExecution();
         assertThat(consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isEqualTo(2);
-        assertThat(partition1.isConsumable()).isFalse();
-        assertThat(partition2.isConsumable()).isFalse();
+        assertThat(partition1.hasDataAllProduced()).isFalse();
+        assertThat(partition2.hasDataAllProduced()).isFalse();
         assertThat(consumedPartitionGroup.areAllPartitionsFinished()).isFalse();
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
index f22c3b23dd7..4b9cded28c6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
@@ -105,20 +105,24 @@ class DefaultExecutionTopologyTest {
     }
 
     @Test
-    void testResultPartitionStateSupplier() {
-        final IntermediateResultPartition intermediateResultPartition =
-                IterableUtils.toStream(executionGraph.getAllExecutionVertices())
-                        .flatMap(v -> v.getProducedPartitions().values().stream())
-                        .findAny()
-                        .get();
+    void testResultPartitionStateSupplier() throws Exception {
+        final JobVertex[] jobVertices = createJobVertices(BLOCKING);
+        executionGraph = createExecutionGraph(EXECUTOR_RESOURCE.getExecutor(), jobVertices);
+        adapter = DefaultExecutionTopology.fromExecutionGraph(executionGraph);
+
+        final ExecutionJobVertex ejv = executionGraph.getJobVertex(jobVertices[0].getID());
+        ExecutionVertex ev = ejv.getTaskVertices()[0];
+        IntermediateResultPartition intermediateResultPartition =
+                ev.getProducedPartitions().values().stream().findAny().get();
 
         final DefaultResultPartition schedulingResultPartition =
                 adapter.getResultPartition(intermediateResultPartition.getPartitionId());
 
         assertThat(schedulingResultPartition.getState()).isEqualTo(ResultPartitionState.CREATED);
 
-        intermediateResultPartition.markDataProduced();
-        assertThat(schedulingResultPartition.getState()).isEqualTo(ResultPartitionState.CONSUMABLE);
+        ev.finishAllBlockingPartitions();
+        assertThat(schedulingResultPartition.getState())
+                .isEqualTo(ResultPartitionState.ALL_DATA_PRODUCED);
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java
index 6b373e3c09d..c0184fe0061 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java
@@ -71,7 +71,7 @@ class DefaultInputConsumableDeciderTest {
                 topology.addExecutionVertices().withParallelism(2).finish();
 
         topology.connectAllToAll(producers, consumer)
-                .withResultPartitionState(ResultPartitionState.CONSUMABLE)
+                .withResultPartitionState(ResultPartitionState.ALL_DATA_PRODUCED)
                 .withResultPartitionType(ResultPartitionType.BLOCKING)
                 .finish();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
index 655f725874c..3bc9c404fae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
@@ -98,7 +98,7 @@ public class TestingSchedulingExecutionVertex implements SchedulingExecutionVert
                         consumedPartition.getResultType());
 
         consumedPartition.registerConsumedPartitionGroup(consumedPartitionGroup);
-        if (consumedPartition.getState() == ResultPartitionState.CONSUMABLE) {
+        if (consumedPartition.getState() == ResultPartitionState.ALL_DATA_PRODUCED) {
             consumedPartitionGroup.partitionFinished();
         }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
index 9e1fdc77ed6..b70883e6c22 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
@@ -120,7 +120,7 @@ public class TestingSchedulingResultPartition implements SchedulingResultPartiti
     void registerConsumedPartitionGroup(ConsumedPartitionGroup consumedPartitionGroup) {
         consumedPartitionGroups.add(consumedPartitionGroup);
 
-        if (getState() == ResultPartitionState.CONSUMABLE) {
+        if (getState() == ResultPartitionState.ALL_DATA_PRODUCED) {
             consumedPartitionGroup.partitionFinished();
         }
     }
@@ -133,7 +133,7 @@ public class TestingSchedulingResultPartition implements SchedulingResultPartiti
         for (ConsumedPartitionGroup consumedPartitionGroup : consumedPartitionGroups) {
             consumedPartitionGroup.partitionFinished();
         }
-        setState(ResultPartitionState.CONSUMABLE);
+        setState(ResultPartitionState.ALL_DATA_PRODUCED);
     }
 
     void setState(ResultPartitionState state) {
@@ -145,7 +145,7 @@ public class TestingSchedulingResultPartition implements SchedulingResultPartiti
         private IntermediateDataSetID intermediateDataSetId = new IntermediateDataSetID();
         private int partitionNum = 0;
         private ResultPartitionType resultPartitionType = ResultPartitionType.BLOCKING;
-        private ResultPartitionState resultPartitionState = ResultPartitionState.CONSUMABLE;
+        private ResultPartitionState resultPartitionState = ResultPartitionState.ALL_DATA_PRODUCED;
 
         Builder withIntermediateDataSetID(IntermediateDataSetID intermediateDataSetId) {
             this.intermediateDataSetId = intermediateDataSetId;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
index c681d32dc4d..ec86fa36d9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
@@ -232,7 +232,8 @@ public class TestingSchedulingTopology implements SchedulingTopology {
 
         protected ResultPartitionType resultPartitionType = ResultPartitionType.BLOCKING;
 
-        protected ResultPartitionState resultPartitionState = ResultPartitionState.CONSUMABLE;
+        protected ResultPartitionState resultPartitionState =
+                ResultPartitionState.ALL_DATA_PRODUCED;
 
         protected ProducerConsumerConnectionBuilder(
                 final List<TestingSchedulingExecutionVertex> producers,
@@ -370,7 +371,7 @@ public class TestingSchedulingTopology implements SchedulingTopology {
 
             for (TestingSchedulingResultPartition resultPartition : resultPartitions) {
                 resultPartition.registerConsumedPartitionGroup(consumedPartitionGroup);
-                if (resultPartition.getState() == ResultPartitionState.CONSUMABLE) {
+                if (resultPartition.getState() == ResultPartitionState.ALL_DATA_PRODUCED) {
                     consumedPartitionGroup.partitionFinished();
                 }
             }


[flink] 02/02: [FLINK-30188][coordination] Set partition finished state in ConsumedPartitionGroup for dynamic graph correctly.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ea67f63eb1c43d7adf07c37946b20b525fb015d
Author: Weijie Guo <re...@163.com>
AuthorDate: Thu Nov 24 21:09:58 2022 +0800

    [FLINK-30188][coordination] Set partition finished state in ConsumedPartitionGroup for dynamic graph correctly.
    
    This closes #21388
---
 .../executiongraph/EdgeManagerBuildUtil.java       |  20 +++
 .../DefaultExecutionGraphConstructionTest.java     | 153 +++++++++++++++++++++
 2 files changed, 173 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
index a0f6520843d..cb2918f4089 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -190,6 +191,10 @@ public class EdgeManagerBuildUtil {
         ConsumedPartitionGroup consumedPartitionGroup =
                 ConsumedPartitionGroup.fromSinglePartition(
                         numConsumers, consumedPartitionId, intermediateResult.getResultType());
+        finishAllDataProducedPartitions(
+                intermediateResult,
+                Collections.singletonList(consumedPartitionId),
+                consumedPartitionGroup);
         registerConsumedPartitionGroupToEdgeManager(consumedPartitionGroup, intermediateResult);
         return consumedPartitionGroup;
     }
@@ -201,10 +206,25 @@ public class EdgeManagerBuildUtil {
         ConsumedPartitionGroup consumedPartitionGroup =
                 ConsumedPartitionGroup.fromMultiplePartitions(
                         numConsumers, consumedPartitions, intermediateResult.getResultType());
+        finishAllDataProducedPartitions(
+                intermediateResult, consumedPartitions, consumedPartitionGroup);
         registerConsumedPartitionGroupToEdgeManager(consumedPartitionGroup, intermediateResult);
         return consumedPartitionGroup;
     }
 
+    private static void finishAllDataProducedPartitions(
+            IntermediateResult intermediateResult,
+            List<IntermediateResultPartitionID> consumedPartitionIds,
+            ConsumedPartitionGroup consumedPartitionGroup) {
+        for (IntermediateResultPartitionID consumedPartitionId : consumedPartitionIds) {
+            // this is for dynamic graph as consumedPartitionGroup has not been created when the
+            // partition becomes finished.
+            if (intermediateResult.getPartitionById(consumedPartitionId).hasDataAllProduced()) {
+                consumedPartitionGroup.partitionFinished();
+            }
+        }
+    }
+
     private static void registerConsumedPartitionGroupToEdgeManager(
             ConsumedPartitionGroup consumedPartitionGroup, IntermediateResult intermediateResult) {
         intermediateResult
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
index 588a91c6fd7..47fce0af13b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
@@ -337,6 +337,159 @@ class DefaultExecutionGraphConstructionTest {
                         partition1.getPartitionId(), partition2.getPartitionId());
     }
 
+    @Test
+    void testPointWiseConsumedPartitionGroupPartitionFinished() throws Exception {
+        JobVertex v1 = new JobVertex("source");
+        JobVertex v2 = new JobVertex("sink");
+
+        v1.setParallelism(4);
+        v2.setParallelism(2);
+
+        v2.connectNewDataSetAsInput(
+                v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+        List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2));
+        ExecutionGraph eg = createDefaultExecutionGraph(ordered);
+        eg.attachJobGraph(ordered);
+
+        IntermediateResult result =
+                Objects.requireNonNull(eg.getJobVertex(v1.getID())).getProducedDataSets()[0];
+
+        IntermediateResultPartition partition1 = result.getPartitions()[0];
+        IntermediateResultPartition partition2 = result.getPartitions()[1];
+        IntermediateResultPartition partition3 = result.getPartitions()[2];
+        IntermediateResultPartition partition4 = result.getPartitions()[3];
+
+        ConsumedPartitionGroup consumedPartitionGroup1 =
+                partition1.getConsumedPartitionGroups().get(0);
+
+        ConsumedPartitionGroup consumedPartitionGroup2 =
+                partition4.getConsumedPartitionGroups().get(0);
+
+        assertThat(consumedPartitionGroup1.getNumberOfUnfinishedPartitions()).isEqualTo(2);
+        assertThat(consumedPartitionGroup2.getNumberOfUnfinishedPartitions()).isEqualTo(2);
+        partition1.markFinished();
+        partition2.markFinished();
+        assertThat(consumedPartitionGroup1.getNumberOfUnfinishedPartitions()).isZero();
+        partition3.markFinished();
+        partition4.markFinished();
+        assertThat(consumedPartitionGroup2.getNumberOfUnfinishedPartitions()).isZero();
+    }
+
+    @Test
+    void testAllToAllConsumedPartitionGroupPartitionFinished() throws Exception {
+        JobVertex v1 = new JobVertex("source");
+        JobVertex v2 = new JobVertex("sink");
+
+        v1.setParallelism(2);
+        v2.setParallelism(2);
+
+        v2.connectNewDataSetAsInput(
+                v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+        List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2));
+        ExecutionGraph eg = createDefaultExecutionGraph(ordered);
+        eg.attachJobGraph(ordered);
+
+        IntermediateResult result =
+                Objects.requireNonNull(eg.getJobVertex(v1.getID())).getProducedDataSets()[0];
+
+        IntermediateResultPartition partition1 = result.getPartitions()[0];
+        IntermediateResultPartition partition2 = result.getPartitions()[1];
+
+        ConsumedPartitionGroup consumedPartitionGroup =
+                partition1.getConsumedPartitionGroups().get(0);
+
+        assertThat(consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isEqualTo(2);
+        partition1.markFinished();
+        assertThat(consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isEqualTo(1);
+        partition2.markFinished();
+        assertThat(consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isZero();
+    }
+
+    @Test
+    void testDynamicGraphAllToAllConsumedPartitionGroupPartitionFinished() throws Exception {
+        JobVertex v1 = new JobVertex("source");
+        JobVertex v2 = new JobVertex("sink");
+
+        v1.setParallelism(2);
+        v2.setParallelism(2);
+
+        v2.connectNewDataSetAsInput(
+                v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+        List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2));
+        ExecutionGraph eg = createDynamicExecutionGraph(ordered);
+        eg.attachJobGraph(ordered);
+
+        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
+        eg.initializeJobVertex(ejv1, 0L);
+
+        IntermediateResult result =
+                Objects.requireNonNull(eg.getJobVertex(v1.getID())).getProducedDataSets()[0];
+
+        IntermediateResultPartition partition1 = result.getPartitions()[0];
+        IntermediateResultPartition partition2 = result.getPartitions()[1];
+
+        partition1.markFinished();
+        partition2.markFinished();
+
+        assertThat(partition1.getConsumedPartitionGroups()).isEmpty();
+
+        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
+        eg.initializeJobVertex(ejv2, 0L);
+
+        ConsumedPartitionGroup consumedPartitionGroup =
+                partition1.getConsumedPartitionGroups().get(0);
+
+        assertThat(consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isZero();
+    }
+
+    @Test
+    void testDynamicGraphPointWiseConsumedPartitionGroupPartitionFinished() throws Exception {
+        JobVertex v1 = new JobVertex("source");
+        JobVertex v2 = new JobVertex("sink");
+
+        v1.setParallelism(4);
+        v2.setParallelism(2);
+
+        v2.connectNewDataSetAsInput(
+                v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+        List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2));
+        ExecutionGraph eg = createDynamicExecutionGraph(ordered);
+        eg.attachJobGraph(ordered);
+
+        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
+        eg.initializeJobVertex(ejv1, 0L);
+
+        IntermediateResult result =
+                Objects.requireNonNull(eg.getJobVertex(v1.getID())).getProducedDataSets()[0];
+
+        IntermediateResultPartition partition1 = result.getPartitions()[0];
+        IntermediateResultPartition partition2 = result.getPartitions()[1];
+        IntermediateResultPartition partition3 = result.getPartitions()[2];
+        IntermediateResultPartition partition4 = result.getPartitions()[3];
+
+        partition1.markFinished();
+        partition2.markFinished();
+        partition3.markFinished();
+        partition4.markFinished();
+
+        assertThat(partition1.getConsumedPartitionGroups()).isEmpty();
+        assertThat(partition4.getConsumedPartitionGroups()).isEmpty();
+
+        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
+        eg.initializeJobVertex(ejv2, 0L);
+
+        ConsumedPartitionGroup consumedPartitionGroup1 =
+                partition1.getConsumedPartitionGroups().get(0);
+        assertThat(consumedPartitionGroup1.getNumberOfUnfinishedPartitions()).isZero();
+        ConsumedPartitionGroup consumedPartitionGroup2 =
+                partition4.getConsumedPartitionGroups().get(0);
+        assertThat(consumedPartitionGroup2.getNumberOfUnfinishedPartitions()).isZero();
+    }
+
     @Test
     void testAttachToDynamicGraph() throws Exception {
         JobVertex v1 = new JobVertex("source");