You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2023/01/28 03:47:29 UTC

[flink] 06/06: [FLINK-30755][runtime] Remove legacy codes of marking not support speculative executions

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d336dad93459622be2dd619cc1d8d3709118628b
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Mon Jan 23 01:18:28 2023 +0800

    [FLINK-30755][runtime] Remove legacy codes of marking not support speculative executions
---
 .../executiongraph/SpeculativeExecutionVertex.java |   8 -
 .../apache/flink/runtime/jobgraph/JobVertex.java   |  22 ---
 .../adaptivebatch/SpeculativeScheduler.java        |   2 +-
 .../flink/streaming/api/graph/StreamGraph.java     |  16 --
 .../api/graph/StreamingJobGraphGenerator.java      |  22 ---
 .../StreamingJobGraphGeneratorSourceSinkTest.java  | 162 ---------------------
 6 files changed, 1 insertion(+), 231 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
index 48171a81d57..dbd5f84a461 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
@@ -72,14 +72,6 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
         this.nextInputSplitIndexToConsumeByAttempts = new HashMap<>();
     }
 
-    public boolean containsSources() {
-        return getJobVertex().getJobVertex().containsSources();
-    }
-
-    public boolean containsSinks() {
-        return getJobVertex().getJobVertex().containsSinks();
-    }
-
     public boolean isSupportsConcurrentExecutionAttempts() {
         return getJobVertex().getJobVertex().isSupportsConcurrentExecutionAttempts();
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index eb322fe72bd..442512bdd2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -149,12 +149,6 @@ public class JobVertex implements java.io.Serializable {
      */
     private final List<IntermediateDataSetID> intermediateDataSetIdsToConsume = new ArrayList<>();
 
-    /** Indicates whether this job vertex contains source operators. */
-    private boolean containsSourceOperators = false;
-
-    /** Indicates whether this job vertex contains sink operators. */
-    private boolean containsSinkOperators = false;
-
     /**
      * Indicates whether this job vertex supports multiple attempts of the same subtask executing at
      * the same time.
@@ -543,22 +537,6 @@ public class JobVertex implements java.io.Serializable {
         return inputs.isEmpty();
     }
 
-    public void markContainsSources() {
-        this.containsSourceOperators = true;
-    }
-
-    public boolean containsSources() {
-        return containsSourceOperators;
-    }
-
-    public void markContainsSinks() {
-        this.containsSinkOperators = true;
-    }
-
-    public boolean containsSinks() {
-        return containsSinkOperators;
-    }
-
     public void setSupportsConcurrentExecutionAttempts(
             boolean supportsConcurrentExecutionAttempts) {
         this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
index 6d7bea3e9bf..ea26f8158b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
@@ -329,7 +329,7 @@ public class SpeculativeScheduler extends AdaptiveBatchScheduler
             final SpeculativeExecutionVertex executionVertex =
                     getExecutionVertex(executionVertexId);
 
-            if (executionVertex.containsSinks()) {
+            if (!executionVertex.isSupportsConcurrentExecutionAttempts()) {
                 continue;
             }
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 6b082ffd45d..636c4893178 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -118,7 +118,6 @@ public class StreamGraph implements Pipeline {
     private Map<Integer, StreamNode> streamNodes;
     private Set<Integer> sources;
     private Set<Integer> sinks;
-    private Set<Integer> expandedSinks;
     private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;
     private Map<Integer, Tuple3<Integer, StreamPartitioner<?>, StreamExchangeMode>>
             virtualPartitionNodes;
@@ -161,7 +160,6 @@ public class StreamGraph implements Pipeline {
         iterationSourceSinkPairs = new HashSet<>();
         sources = new HashSet<>();
         sinks = new HashSet<>();
-        expandedSinks = new HashSet<>();
         slotSharingGroupResources = new HashMap<>();
     }
 
@@ -373,16 +371,6 @@ public class StreamGraph implements Pipeline {
         sinks.add(vertexID);
     }
 
-    /**
-     * Register expanded sink nodes. These nodes should also be treated as sinks. But we do not add
-     * them into {@link #sinks} to avoid messing up the json plan.
-     *
-     * @param nodeIds sink nodes to register
-     */
-    public void registerExpandedSinks(Collection<Integer> nodeIds) {
-        expandedSinks.addAll(nodeIds);
-    }
-
     public <IN, OUT> void addOperator(
             Integer vertexID,
             @Nullable String slotSharingGroup,
@@ -895,10 +883,6 @@ public class StreamGraph implements Pipeline {
         return sinks;
     }
 
-    public Collection<Integer> getExpandedSinkIds() {
-        return expandedSinks;
-    }
-
     public Collection<StreamNode> getStreamNodes() {
         return streamNodes.values();
     }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 2e7b332cd9d..eafe8ae128e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -239,7 +239,6 @@ public class StreamingJobGraphGenerator {
 
         setPhysicalEdges();
 
-        markContainsSourcesOrSinks();
         markSupportingConcurrentExecutionAttempts();
 
         setSlotSharingAndCoLocation();
@@ -1390,27 +1389,6 @@ public class StreamingJobGraphGenerator {
         return upStreamVertex.getOperatorFactory();
     }
 
-    private void markContainsSourcesOrSinks() {
-        for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
-            final JobVertex jobVertex = entry.getValue();
-            final Set<Integer> vertexOperators = new HashSet<>();
-            vertexOperators.add(entry.getKey());
-            if (chainedConfigs.containsKey(entry.getKey())) {
-                vertexOperators.addAll(chainedConfigs.get(entry.getKey()).keySet());
-            }
-
-            for (int nodeId : vertexOperators) {
-                if (streamGraph.getSourceIDs().contains(nodeId)) {
-                    jobVertex.markContainsSources();
-                }
-                if (streamGraph.getSinkIDs().contains(nodeId)
-                        || streamGraph.getExpandedSinkIds().contains(nodeId)) {
-                    jobVertex.markContainsSinks();
-                }
-            }
-        }
-    }
-
     private void markSupportingConcurrentExecutionAttempts() {
         for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
             final JobVertex jobVertex = entry.getValue();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorSourceSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorSourceSinkTest.java
deleted file mode 100644
index 2f59790b4e0..00000000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorSourceSinkTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
-import org.apache.flink.streaming.util.TestExpandingSink;
-import org.apache.flink.util.TestLoggerExtension;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import java.util.List;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/**
- * Tests whether a generated job vertex is correctly marked as a source/sink by {@link
- * StreamingJobGraphGenerator}.
- */
-@ExtendWith(TestLoggerExtension.class)
-class StreamingJobGraphGeneratorSourceSinkTest {
-
-    private StreamExecutionEnvironment env;
-
-    @BeforeEach
-    void setUp() {
-        env = StreamExecutionEnvironment.getExecutionEnvironment();
-    }
-
-    @Test
-    void testLegacySource() {
-        env.fromElements(0, 1).map(i -> i);
-
-        final List<JobVertex> verticesSorted = getJobVertices();
-
-        final JobVertex sourceVertex = verticesSorted.get(0);
-        assertThat(sourceVertex.containsSources()).isTrue();
-        assertThat(sourceVertex.containsSinks()).isFalse();
-    }
-
-    @Test
-    void testNewSource() {
-        env.fromSequence(0, 1).map(i -> i);
-
-        final List<JobVertex> verticesSorted = getJobVertices();
-
-        final JobVertex sourceVertex = verticesSorted.get(0);
-        assertThat(sourceVertex.containsSources()).isTrue();
-        assertThat(sourceVertex.containsSinks()).isFalse();
-    }
-
-    @Test
-    void testMultiInputSource() {
-        final DataStream<Long> source1 = env.fromSequence(0, 1);
-        final DataStream<Long> source2 = env.fromSequence(0, 1);
-        final MultipleInputTransformation<Long> multiInputTransform =
-                new MultipleInputTransformation<>(
-                        "multi-input-operator",
-                        new StreamingJobGraphGeneratorTest.UnusedOperatorFactory(),
-                        Types.LONG,
-                        env.getParallelism());
-        multiInputTransform.addInput(source1.map(i -> i).getTransformation());
-        multiInputTransform.addInput(source2.getTransformation());
-        multiInputTransform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
-        env.addOperator(multiInputTransform);
-
-        final List<JobVertex> verticesSorted = getJobVertices();
-
-        final JobVertex source1Vertex = verticesSorted.get(0);
-        assertThat(source1Vertex.containsSources()).isTrue();
-        assertThat(source1Vertex.containsSinks()).isFalse();
-
-        // source-2 is chained with the multi-input vertex
-        final JobVertex multiInputVertex = verticesSorted.get(1);
-        assertThat(multiInputVertex.containsSources()).isTrue();
-        assertThat(multiInputVertex.containsSinks()).isFalse();
-    }
-
-    @Test
-    void testLegacySink() {
-        env.fromElements(0, 1).map(i -> i).startNewChain().addSink(new SinkFunction<Integer>() {});
-
-        final List<JobVertex> verticesSorted = getJobVertices();
-
-        final JobVertex sinkVertex = verticesSorted.get(1);
-        assertThat(sinkVertex.containsSources()).isFalse();
-        assertThat(sinkVertex.containsSinks()).isTrue();
-    }
-
-    @Test
-    void testNewSink() {
-        env.fromElements(0, 1).disableChaining().sinkTo(new TestExpandingSink());
-
-        final List<JobVertex> verticesSorted = getJobVertices();
-
-        final JobVertex sinkVertex = verticesSorted.get(1);
-        assertThat(sinkVertex.containsSources()).isFalse();
-        assertThat(sinkVertex.containsSinks()).isTrue();
-    }
-
-    @Test
-    void testNewSinkWithSinkTopology() {
-        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        env.fromElements(0, 1).disableChaining().sinkTo(new TestExpandingSink());
-
-        final List<JobVertex> verticesSorted = getJobVertices();
-
-        final JobVertex sinkWriterVertex = verticesSorted.get(1);
-        assertThat(sinkWriterVertex.containsSources()).isFalse();
-        assertThat(sinkWriterVertex.containsSinks()).isTrue();
-
-        final JobVertex sinkCommitterVertex = verticesSorted.get(2);
-        assertThat(sinkCommitterVertex.containsSources()).isFalse();
-        assertThat(sinkCommitterVertex.containsSinks()).isTrue();
-
-        final JobVertex sinkPostCommitterVertex = verticesSorted.get(3);
-        assertThat(sinkPostCommitterVertex.containsSources()).isFalse();
-        assertThat(sinkPostCommitterVertex.containsSinks()).isTrue();
-    }
-
-    @Test
-    void testChainedSourceSink() {
-        env.setParallelism(1);
-        env.fromElements(0, 1).sinkTo(new TestExpandingSink());
-
-        final List<JobVertex> verticesSorted = getJobVertices();
-
-        final JobVertex sourceSinkVertex = verticesSorted.get(0);
-        assertThat(sourceSinkVertex.containsSources()).isTrue();
-        assertThat(sourceSinkVertex.containsSinks()).isTrue();
-    }
-
-    private List<JobVertex> getJobVertices() {
-        final StreamGraph streamGraph = env.getStreamGraph();
-        final JobGraph jobGraph = streamGraph.getJobGraph();
-        return jobGraph.getVerticesSortedTopologicallyFromSources();
-    }
-}