You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/12/15 02:16:35 UTC
[flink] 03/04: [FLINK-29767] Let consumerVertexGroup also know the type of result partition.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 022f7adeb864f32780f80d7113a0af6ddb3e10d5
Author: Weijie Guo <re...@163.com>
AuthorDate: Mon Oct 31 15:30:13 2022 +0800
[FLINK-29767] Let consumerVertexGroup also know the type of result partition.
---
.../executiongraph/EdgeManagerBuildUtil.java | 12 ++++++++----
.../scheduler/strategy/ConsumerVertexGroup.java | 22 +++++++++++++++++-----
.../adapter/DefaultResultPartitionTest.java | 4 +++-
.../strategy/TestingSchedulingResultPartition.java | 7 +++++--
.../strategy/TestingSchedulingTopology.java | 8 +++++---
5 files changed, 38 insertions(+), 15 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
index 8ac55b7b7a2..a0f6520843d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
@@ -99,7 +99,8 @@ public class EdgeManagerBuildUtil {
.map(ExecutionVertex::getID)
.collect(Collectors.toList());
ConsumerVertexGroup consumerVertexGroup =
- ConsumerVertexGroup.fromMultipleVertices(consumerVertices);
+ ConsumerVertexGroup.fromMultipleVertices(
+ consumerVertices, intermediateResult.getResultType());
for (IntermediateResultPartition partition : intermediateResult.getPartitions()) {
partition.addConsumers(consumerVertexGroup);
}
@@ -117,7 +118,8 @@ public class EdgeManagerBuildUtil {
IntermediateResultPartition partition = intermediateResult.getPartitions()[i];
ConsumerVertexGroup consumerVertexGroup =
- ConsumerVertexGroup.fromSingleVertex(executionVertex.getID());
+ ConsumerVertexGroup.fromSingleVertex(
+ executionVertex.getID(), intermediateResult.getResultType());
partition.addConsumers(consumerVertexGroup);
ConsumedPartitionGroup consumedPartitionGroup =
@@ -132,7 +134,8 @@ public class EdgeManagerBuildUtil {
ExecutionVertex executionVertex = taskVertices[index];
ConsumerVertexGroup consumerVertexGroup =
- ConsumerVertexGroup.fromSingleVertex(executionVertex.getID());
+ ConsumerVertexGroup.fromSingleVertex(
+ executionVertex.getID(), intermediateResult.getResultType());
int start = index * sourceCount / targetCount;
int end = (index + 1) * sourceCount / targetCount;
@@ -173,7 +176,8 @@ public class EdgeManagerBuildUtil {
}
ConsumerVertexGroup consumerVertexGroup =
- ConsumerVertexGroup.fromMultipleVertices(consumers);
+ ConsumerVertexGroup.fromMultipleVertices(
+ consumers, intermediateResult.getResultType());
partition.addConsumers(consumerVertexGroup);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java
index 10c0bdcc1db..fb8b3f1951c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.scheduler.strategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -26,16 +28,26 @@ import java.util.List;
public class ConsumerVertexGroup implements Iterable<ExecutionVertexID> {
private final List<ExecutionVertexID> vertices;
- private ConsumerVertexGroup(List<ExecutionVertexID> vertices) {
+ private final ResultPartitionType resultPartitionType;
+
+ private ConsumerVertexGroup(
+ List<ExecutionVertexID> vertices, ResultPartitionType resultPartitionType) {
this.vertices = vertices;
+ this.resultPartitionType = resultPartitionType;
+ }
+
+ public static ConsumerVertexGroup fromMultipleVertices(
+ List<ExecutionVertexID> vertices, ResultPartitionType resultPartitionType) {
+ return new ConsumerVertexGroup(vertices, resultPartitionType);
}
- public static ConsumerVertexGroup fromMultipleVertices(List<ExecutionVertexID> vertices) {
- return new ConsumerVertexGroup(vertices);
+ public static ConsumerVertexGroup fromSingleVertex(
+ ExecutionVertexID vertex, ResultPartitionType resultPartitionType) {
+ return new ConsumerVertexGroup(Collections.singletonList(vertex), resultPartitionType);
}
- public static ConsumerVertexGroup fromSingleVertex(ExecutionVertexID vertex) {
- return new ConsumerVertexGroup(Collections.singletonList(vertex));
+ public ResultPartitionType getResultPartitionType() {
+ return resultPartitionType;
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java
index 9f8c58400e3..9a1f6e7e45d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java
@@ -86,7 +86,9 @@ class DefaultResultPartitionTest {
ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
consumerVertexGroups.put(
resultPartition.getId(),
- Collections.singletonList(ConsumerVertexGroup.fromSingleVertex(executionVertexId)));
+ Collections.singletonList(
+ ConsumerVertexGroup.fromSingleVertex(
+ executionVertexId, resultPartition.getResultType())));
assertThat(resultPartition.getConsumerVertexGroups()).isNotEmpty();
assertThat(resultPartition.getConsumerVertexGroups().get(0)).contains(executionVertexId);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
index 6274eecb285..9e1fdc77ed6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
@@ -102,14 +102,17 @@ public class TestingSchedulingResultPartition implements SchedulingResultPartiti
return Collections.unmodifiableList(consumedPartitionGroups);
}
- void addConsumerGroup(Collection<TestingSchedulingExecutionVertex> consumerVertices) {
+ void addConsumerGroup(
+ Collection<TestingSchedulingExecutionVertex> consumerVertices,
+ ResultPartitionType resultPartitionType) {
checkState(this.consumerVertexGroup == null);
final ConsumerVertexGroup consumerVertexGroup =
ConsumerVertexGroup.fromMultipleVertices(
consumerVertices.stream()
.map(TestingSchedulingExecutionVertex::getId)
- .collect(Collectors.toList()));
+ .collect(Collectors.toList()),
+ resultPartitionType);
this.consumerVertexGroup = consumerVertexGroup;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
index 95596369e33..c681d32dc4d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
@@ -194,7 +194,8 @@ public class TestingSchedulingTopology implements SchedulingTopology {
.withResultPartitionType(resultPartitionType)
.build();
- resultPartition.addConsumerGroup(Collections.singleton(consumer));
+ resultPartition.addConsumerGroup(
+ Collections.singleton(consumer), resultPartition.getResultType());
resultPartition.setProducer(producer);
producer.addProducedPartition(resultPartition);
@@ -304,7 +305,8 @@ public class TestingSchedulingTopology implements SchedulingTopology {
resultPartition.setProducer(producer);
producer.addProducedPartition(resultPartition);
consumer.addConsumedPartition(resultPartition);
- resultPartition.addConsumerGroup(Collections.singleton(consumer));
+ resultPartition.addConsumerGroup(
+ Collections.singleton(consumer), resultPartitionType);
resultPartitions.add(resultPartition);
}
@@ -344,7 +346,7 @@ public class TestingSchedulingTopology implements SchedulingTopology {
resultPartition.setProducer(producer);
producer.addProducedPartition(resultPartition);
- resultPartition.addConsumerGroup(consumers);
+ resultPartition.addConsumerGroup(consumers, resultPartitionType);
resultPartitions.add(resultPartition);
}