You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/12/15 02:16:35 UTC

[flink] 03/04: [FLINK-29767] Let consumerVertexGroup also know the type of result partition.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 022f7adeb864f32780f80d7113a0af6ddb3e10d5
Author: Weijie Guo <re...@163.com>
AuthorDate: Mon Oct 31 15:30:13 2022 +0800

    [FLINK-29767] Let consumerVertexGroup also know the type of result partition.
---
 .../executiongraph/EdgeManagerBuildUtil.java       | 12 ++++++++----
 .../scheduler/strategy/ConsumerVertexGroup.java    | 22 +++++++++++++++++-----
 .../adapter/DefaultResultPartitionTest.java        |  4 +++-
 .../strategy/TestingSchedulingResultPartition.java |  7 +++++--
 .../strategy/TestingSchedulingTopology.java        |  8 +++++---
 5 files changed, 38 insertions(+), 15 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
index 8ac55b7b7a2..a0f6520843d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
@@ -99,7 +99,8 @@ public class EdgeManagerBuildUtil {
                         .map(ExecutionVertex::getID)
                         .collect(Collectors.toList());
         ConsumerVertexGroup consumerVertexGroup =
-                ConsumerVertexGroup.fromMultipleVertices(consumerVertices);
+                ConsumerVertexGroup.fromMultipleVertices(
+                        consumerVertices, intermediateResult.getResultType());
         for (IntermediateResultPartition partition : intermediateResult.getPartitions()) {
             partition.addConsumers(consumerVertexGroup);
         }
@@ -117,7 +118,8 @@ public class EdgeManagerBuildUtil {
                 IntermediateResultPartition partition = intermediateResult.getPartitions()[i];
 
                 ConsumerVertexGroup consumerVertexGroup =
-                        ConsumerVertexGroup.fromSingleVertex(executionVertex.getID());
+                        ConsumerVertexGroup.fromSingleVertex(
+                                executionVertex.getID(), intermediateResult.getResultType());
                 partition.addConsumers(consumerVertexGroup);
 
                 ConsumedPartitionGroup consumedPartitionGroup =
@@ -132,7 +134,8 @@ public class EdgeManagerBuildUtil {
 
                 ExecutionVertex executionVertex = taskVertices[index];
                 ConsumerVertexGroup consumerVertexGroup =
-                        ConsumerVertexGroup.fromSingleVertex(executionVertex.getID());
+                        ConsumerVertexGroup.fromSingleVertex(
+                                executionVertex.getID(), intermediateResult.getResultType());
 
                 int start = index * sourceCount / targetCount;
                 int end = (index + 1) * sourceCount / targetCount;
@@ -173,7 +176,8 @@ public class EdgeManagerBuildUtil {
                 }
 
                 ConsumerVertexGroup consumerVertexGroup =
-                        ConsumerVertexGroup.fromMultipleVertices(consumers);
+                        ConsumerVertexGroup.fromMultipleVertices(
+                                consumers, intermediateResult.getResultType());
                 partition.addConsumers(consumerVertexGroup);
             }
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java
index 10c0bdcc1db..fb8b3f1951c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.scheduler.strategy;
 
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -26,16 +28,26 @@ import java.util.List;
 public class ConsumerVertexGroup implements Iterable<ExecutionVertexID> {
     private final List<ExecutionVertexID> vertices;
 
-    private ConsumerVertexGroup(List<ExecutionVertexID> vertices) {
+    private final ResultPartitionType resultPartitionType;
+
+    private ConsumerVertexGroup(
+            List<ExecutionVertexID> vertices, ResultPartitionType resultPartitionType) {
         this.vertices = vertices;
+        this.resultPartitionType = resultPartitionType;
+    }
+
+    public static ConsumerVertexGroup fromMultipleVertices(
+            List<ExecutionVertexID> vertices, ResultPartitionType resultPartitionType) {
+        return new ConsumerVertexGroup(vertices, resultPartitionType);
     }
 
-    public static ConsumerVertexGroup fromMultipleVertices(List<ExecutionVertexID> vertices) {
-        return new ConsumerVertexGroup(vertices);
+    public static ConsumerVertexGroup fromSingleVertex(
+            ExecutionVertexID vertex, ResultPartitionType resultPartitionType) {
+        return new ConsumerVertexGroup(Collections.singletonList(vertex), resultPartitionType);
     }
 
-    public static ConsumerVertexGroup fromSingleVertex(ExecutionVertexID vertex) {
-        return new ConsumerVertexGroup(Collections.singletonList(vertex));
+    public ResultPartitionType getResultPartitionType() {
+        return resultPartitionType;
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java
index 9f8c58400e3..9a1f6e7e45d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java
@@ -86,7 +86,9 @@ class DefaultResultPartitionTest {
         ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
         consumerVertexGroups.put(
                 resultPartition.getId(),
-                Collections.singletonList(ConsumerVertexGroup.fromSingleVertex(executionVertexId)));
+                Collections.singletonList(
+                        ConsumerVertexGroup.fromSingleVertex(
+                                executionVertexId, resultPartition.getResultType())));
         assertThat(resultPartition.getConsumerVertexGroups()).isNotEmpty();
         assertThat(resultPartition.getConsumerVertexGroups().get(0)).contains(executionVertexId);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
index 6274eecb285..9e1fdc77ed6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
@@ -102,14 +102,17 @@ public class TestingSchedulingResultPartition implements SchedulingResultPartiti
         return Collections.unmodifiableList(consumedPartitionGroups);
     }
 
-    void addConsumerGroup(Collection<TestingSchedulingExecutionVertex> consumerVertices) {
+    void addConsumerGroup(
+            Collection<TestingSchedulingExecutionVertex> consumerVertices,
+            ResultPartitionType resultPartitionType) {
         checkState(this.consumerVertexGroup == null);
 
         final ConsumerVertexGroup consumerVertexGroup =
                 ConsumerVertexGroup.fromMultipleVertices(
                         consumerVertices.stream()
                                 .map(TestingSchedulingExecutionVertex::getId)
-                                .collect(Collectors.toList()));
+                                .collect(Collectors.toList()),
+                        resultPartitionType);
 
         this.consumerVertexGroup = consumerVertexGroup;
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
index 95596369e33..c681d32dc4d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
@@ -194,7 +194,8 @@ public class TestingSchedulingTopology implements SchedulingTopology {
                         .withResultPartitionType(resultPartitionType)
                         .build();
 
-        resultPartition.addConsumerGroup(Collections.singleton(consumer));
+        resultPartition.addConsumerGroup(
+                Collections.singleton(consumer), resultPartition.getResultType());
         resultPartition.setProducer(producer);
 
         producer.addProducedPartition(resultPartition);
@@ -304,7 +305,8 @@ public class TestingSchedulingTopology implements SchedulingTopology {
                 resultPartition.setProducer(producer);
                 producer.addProducedPartition(resultPartition);
                 consumer.addConsumedPartition(resultPartition);
-                resultPartition.addConsumerGroup(Collections.singleton(consumer));
+                resultPartition.addConsumerGroup(
+                        Collections.singleton(consumer), resultPartitionType);
                 resultPartitions.add(resultPartition);
             }
 
@@ -344,7 +346,7 @@ public class TestingSchedulingTopology implements SchedulingTopology {
                 resultPartition.setProducer(producer);
                 producer.addProducedPartition(resultPartition);
 
-                resultPartition.addConsumerGroup(consumers);
+                resultPartition.addConsumerGroup(consumers, resultPartitionType);
                 resultPartitions.add(resultPartition);
             }