You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2023/01/28 03:47:29 UTC
[flink] 06/06: [FLINK-30755][runtime] Remove legacy codes of marking not support speculative executions
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d336dad93459622be2dd619cc1d8d3709118628b
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Mon Jan 23 01:18:28 2023 +0800
[FLINK-30755][runtime] Remove legacy codes of marking not support speculative executions
---
.../executiongraph/SpeculativeExecutionVertex.java | 8 -
.../apache/flink/runtime/jobgraph/JobVertex.java | 22 ---
.../adaptivebatch/SpeculativeScheduler.java | 2 +-
.../flink/streaming/api/graph/StreamGraph.java | 16 --
.../api/graph/StreamingJobGraphGenerator.java | 22 ---
.../StreamingJobGraphGeneratorSourceSinkTest.java | 162 ---------------------
6 files changed, 1 insertion(+), 231 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
index 48171a81d57..dbd5f84a461 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
@@ -72,14 +72,6 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
this.nextInputSplitIndexToConsumeByAttempts = new HashMap<>();
}
- public boolean containsSources() {
- return getJobVertex().getJobVertex().containsSources();
- }
-
- public boolean containsSinks() {
- return getJobVertex().getJobVertex().containsSinks();
- }
-
public boolean isSupportsConcurrentExecutionAttempts() {
return getJobVertex().getJobVertex().isSupportsConcurrentExecutionAttempts();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index eb322fe72bd..442512bdd2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -149,12 +149,6 @@ public class JobVertex implements java.io.Serializable {
*/
private final List<IntermediateDataSetID> intermediateDataSetIdsToConsume = new ArrayList<>();
- /** Indicates whether this job vertex contains source operators. */
- private boolean containsSourceOperators = false;
-
- /** Indicates whether this job vertex contains sink operators. */
- private boolean containsSinkOperators = false;
-
/**
* Indicates whether this job vertex supports multiple attempts of the same subtask executing at
* the same time.
@@ -543,22 +537,6 @@ public class JobVertex implements java.io.Serializable {
return inputs.isEmpty();
}
- public void markContainsSources() {
- this.containsSourceOperators = true;
- }
-
- public boolean containsSources() {
- return containsSourceOperators;
- }
-
- public void markContainsSinks() {
- this.containsSinkOperators = true;
- }
-
- public boolean containsSinks() {
- return containsSinkOperators;
- }
-
public void setSupportsConcurrentExecutionAttempts(
boolean supportsConcurrentExecutionAttempts) {
this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
index 6d7bea3e9bf..ea26f8158b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
@@ -329,7 +329,7 @@ public class SpeculativeScheduler extends AdaptiveBatchScheduler
final SpeculativeExecutionVertex executionVertex =
getExecutionVertex(executionVertexId);
- if (executionVertex.containsSinks()) {
+ if (!executionVertex.isSupportsConcurrentExecutionAttempts()) {
continue;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 6b082ffd45d..636c4893178 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -118,7 +118,6 @@ public class StreamGraph implements Pipeline {
private Map<Integer, StreamNode> streamNodes;
private Set<Integer> sources;
private Set<Integer> sinks;
- private Set<Integer> expandedSinks;
private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;
private Map<Integer, Tuple3<Integer, StreamPartitioner<?>, StreamExchangeMode>>
virtualPartitionNodes;
@@ -161,7 +160,6 @@ public class StreamGraph implements Pipeline {
iterationSourceSinkPairs = new HashSet<>();
sources = new HashSet<>();
sinks = new HashSet<>();
- expandedSinks = new HashSet<>();
slotSharingGroupResources = new HashMap<>();
}
@@ -373,16 +371,6 @@ public class StreamGraph implements Pipeline {
sinks.add(vertexID);
}
- /**
- * Register expanded sink nodes. These nodes should also be treated as sinks. But we do not add
- * them into {@link #sinks} to avoid messing up the json plan.
- *
- * @param nodeIds sink nodes to register
- */
- public void registerExpandedSinks(Collection<Integer> nodeIds) {
- expandedSinks.addAll(nodeIds);
- }
-
public <IN, OUT> void addOperator(
Integer vertexID,
@Nullable String slotSharingGroup,
@@ -895,10 +883,6 @@ public class StreamGraph implements Pipeline {
return sinks;
}
- public Collection<Integer> getExpandedSinkIds() {
- return expandedSinks;
- }
-
public Collection<StreamNode> getStreamNodes() {
return streamNodes.values();
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 2e7b332cd9d..eafe8ae128e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -239,7 +239,6 @@ public class StreamingJobGraphGenerator {
setPhysicalEdges();
- markContainsSourcesOrSinks();
markSupportingConcurrentExecutionAttempts();
setSlotSharingAndCoLocation();
@@ -1390,27 +1389,6 @@ public class StreamingJobGraphGenerator {
return upStreamVertex.getOperatorFactory();
}
- private void markContainsSourcesOrSinks() {
- for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
- final JobVertex jobVertex = entry.getValue();
- final Set<Integer> vertexOperators = new HashSet<>();
- vertexOperators.add(entry.getKey());
- if (chainedConfigs.containsKey(entry.getKey())) {
- vertexOperators.addAll(chainedConfigs.get(entry.getKey()).keySet());
- }
-
- for (int nodeId : vertexOperators) {
- if (streamGraph.getSourceIDs().contains(nodeId)) {
- jobVertex.markContainsSources();
- }
- if (streamGraph.getSinkIDs().contains(nodeId)
- || streamGraph.getExpandedSinkIds().contains(nodeId)) {
- jobVertex.markContainsSinks();
- }
- }
- }
- }
-
private void markSupportingConcurrentExecutionAttempts() {
for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
final JobVertex jobVertex = entry.getValue();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorSourceSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorSourceSinkTest.java
deleted file mode 100644
index 2f59790b4e0..00000000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorSourceSinkTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
-import org.apache.flink.streaming.util.TestExpandingSink;
-import org.apache.flink.util.TestLoggerExtension;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import java.util.List;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/**
- * Tests whether a generated job vertex is correctly marked as a source/sink by {@link
- * StreamingJobGraphGenerator}.
- */
-@ExtendWith(TestLoggerExtension.class)
-class StreamingJobGraphGeneratorSourceSinkTest {
-
- private StreamExecutionEnvironment env;
-
- @BeforeEach
- void setUp() {
- env = StreamExecutionEnvironment.getExecutionEnvironment();
- }
-
- @Test
- void testLegacySource() {
- env.fromElements(0, 1).map(i -> i);
-
- final List<JobVertex> verticesSorted = getJobVertices();
-
- final JobVertex sourceVertex = verticesSorted.get(0);
- assertThat(sourceVertex.containsSources()).isTrue();
- assertThat(sourceVertex.containsSinks()).isFalse();
- }
-
- @Test
- void testNewSource() {
- env.fromSequence(0, 1).map(i -> i);
-
- final List<JobVertex> verticesSorted = getJobVertices();
-
- final JobVertex sourceVertex = verticesSorted.get(0);
- assertThat(sourceVertex.containsSources()).isTrue();
- assertThat(sourceVertex.containsSinks()).isFalse();
- }
-
- @Test
- void testMultiInputSource() {
- final DataStream<Long> source1 = env.fromSequence(0, 1);
- final DataStream<Long> source2 = env.fromSequence(0, 1);
- final MultipleInputTransformation<Long> multiInputTransform =
- new MultipleInputTransformation<>(
- "multi-input-operator",
- new StreamingJobGraphGeneratorTest.UnusedOperatorFactory(),
- Types.LONG,
- env.getParallelism());
- multiInputTransform.addInput(source1.map(i -> i).getTransformation());
- multiInputTransform.addInput(source2.getTransformation());
- multiInputTransform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
- env.addOperator(multiInputTransform);
-
- final List<JobVertex> verticesSorted = getJobVertices();
-
- final JobVertex source1Vertex = verticesSorted.get(0);
- assertThat(source1Vertex.containsSources()).isTrue();
- assertThat(source1Vertex.containsSinks()).isFalse();
-
- // source-2 is chained with the multi-input vertex
- final JobVertex multiInputVertex = verticesSorted.get(1);
- assertThat(multiInputVertex.containsSources()).isTrue();
- assertThat(multiInputVertex.containsSinks()).isFalse();
- }
-
- @Test
- void testLegacySink() {
- env.fromElements(0, 1).map(i -> i).startNewChain().addSink(new SinkFunction<Integer>() {});
-
- final List<JobVertex> verticesSorted = getJobVertices();
-
- final JobVertex sinkVertex = verticesSorted.get(1);
- assertThat(sinkVertex.containsSources()).isFalse();
- assertThat(sinkVertex.containsSinks()).isTrue();
- }
-
- @Test
- void testNewSink() {
- env.fromElements(0, 1).disableChaining().sinkTo(new TestExpandingSink());
-
- final List<JobVertex> verticesSorted = getJobVertices();
-
- final JobVertex sinkVertex = verticesSorted.get(1);
- assertThat(sinkVertex.containsSources()).isFalse();
- assertThat(sinkVertex.containsSinks()).isTrue();
- }
-
- @Test
- void testNewSinkWithSinkTopology() {
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- env.fromElements(0, 1).disableChaining().sinkTo(new TestExpandingSink());
-
- final List<JobVertex> verticesSorted = getJobVertices();
-
- final JobVertex sinkWriterVertex = verticesSorted.get(1);
- assertThat(sinkWriterVertex.containsSources()).isFalse();
- assertThat(sinkWriterVertex.containsSinks()).isTrue();
-
- final JobVertex sinkCommitterVertex = verticesSorted.get(2);
- assertThat(sinkCommitterVertex.containsSources()).isFalse();
- assertThat(sinkCommitterVertex.containsSinks()).isTrue();
-
- final JobVertex sinkPostCommitterVertex = verticesSorted.get(3);
- assertThat(sinkPostCommitterVertex.containsSources()).isFalse();
- assertThat(sinkPostCommitterVertex.containsSinks()).isTrue();
- }
-
- @Test
- void testChainedSourceSink() {
- env.setParallelism(1);
- env.fromElements(0, 1).sinkTo(new TestExpandingSink());
-
- final List<JobVertex> verticesSorted = getJobVertices();
-
- final JobVertex sourceSinkVertex = verticesSorted.get(0);
- assertThat(sourceSinkVertex.containsSources()).isTrue();
- assertThat(sourceSinkVertex.containsSinks()).isTrue();
- }
-
- private List<JobVertex> getJobVertices() {
- final StreamGraph streamGraph = env.getStreamGraph();
- final JobGraph jobGraph = streamGraph.getJobGraph();
- return jobGraph.getVerticesSortedTopologicallyFromSources();
- }
-}