You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/13 05:15:27 UTC

[GitHub] [flink] wanglijie95 commented on a diff in pull request #20222: [FLINK-28137] Introduce SpeculativeScheduler

wanglijie95 commented on code in PR #20222:
URL: https://github.com/apache/flink/pull/20222#discussion_r919645557


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##########
@@ -324,13 +332,24 @@ private void restartTasksWithDelay(final FailureHandlingResult failureHandlingRe
                                 .values());
         final boolean globalRecovery = failureHandlingResult.isGlobalFailure();
 
+        if (globalRecovery) {
+            log.info(
+                    "{} tasks will be restarted to recover from a global failure.",
+                    verticesToRestart.size());
+        } else {
+            checkArgument(failureHandlingResult.getFailedExecution().isPresent());
+            log.info(
+                    "{} tasks will be restarted to recover the failed task {}.",

Review Comment:
   The printed task id was changed from `ExecutionVertexID` to `ExecutionAttemptID`. But I think it is more reasonable than before.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##########
@@ -220,46 +217,57 @@ protected void startSchedulingInternal() {
     }
 
     @Override
-    protected void updateTaskExecutionStateInternal(
-            final ExecutionVertexID executionVertexId,
-            final TaskExecutionStateTransition taskExecutionState) {
+    protected void onTaskExecutionStateUpdate(final Execution execution) {
+        switch (execution.getState()) {
+            case FINISHED:
+                onTaskFinished(execution);
+                break;
+            case FAILED:
+                onTaskFailed(execution);
+                break;
+            default:
+                throw new IllegalArgumentException(
+                        String.format(
+                                "State %s should not be notified to DefaultScheduler.",
+                                execution.getState()));
+        }
+    }
 
+    protected void onTaskFinished(final Execution execution) {
+        checkState(execution.getState() == ExecutionState.FINISHED);
+
+        final ExecutionVertexID executionVertexId = execution.getVertex().getID();
         // once a task finishes, it will release the assigned allocation/slot and no longer
         // needs it. Therefore, it should stop reserving the slot so that other tasks are
         // possible to use the slot. Ideally, the `stopReserveAllocation` should happen
         // along with the release slot process. However, that process is hidden in the depth
         // of the ExecutionGraph, so we currently do it in DefaultScheduler after that process
         // is done.
-        if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED) {
-            stopReserveAllocation(executionVertexId);
-        }
+        stopReserveAllocation(executionVertexId);
 
-        schedulingStrategy.onExecutionStateChange(
-                executionVertexId, taskExecutionState.getExecutionState());
-        maybeHandleTaskFailure(taskExecutionState, getCurrentExecutionOfVertex(executionVertexId));
+        schedulingStrategy.onExecutionStateChange(executionVertexId, ExecutionState.FINISHED);

Review Comment:
   The `SchedulingStrategy#onExecutionStateChange` is only called when a task finishes currently, how about change it to `onExecutionFinish` ? Or call it for all state changes?
   
   Otherwise, it may confuse subsequent developers.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java:
##########
@@ -85,6 +85,10 @@ public Collection<Execution> getCurrentExecutions() {
         return Collections.unmodifiableCollection(currentExecutions.values());
     }
 
+    public Execution getCurrentExecutionOrThrow(final ExecutionAttemptID attemptId) {

Review Comment:
   never be used



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blocklist.BlockedNode;
+import org.apache.flink.runtime.blocklist.BlocklistHandler;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.SpeculativeExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionOperations;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetector;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetectorListener;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.topology.Vertex;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IterableUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The speculative scheduler. */
+public class SpeculativeScheduler extends AdaptiveBatchScheduler
+        implements SlowTaskDetectorListener {
+
+    private final int maxConcurrentExecutions;
+
+    private final Duration blockSlowNodeDuration;
+
+    private final BlocklistHandler blocklistHandler;
+
+    private final SlowTaskDetector slowTaskDetector;
+
+    public SpeculativeScheduler(
+            final Logger log,
+            final JobGraph jobGraph,
+            final Executor ioExecutor,
+            final Configuration jobMasterConfiguration,
+            final Consumer<ComponentMainThreadExecutor> startUpAction,
+            final ScheduledExecutor delayExecutor,
+            final ClassLoader userCodeLoader,
+            final CheckpointsCleaner checkpointsCleaner,
+            final CheckpointRecoveryFactory checkpointRecoveryFactory,
+            final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            final SchedulingStrategyFactory schedulingStrategyFactory,
+            final FailoverStrategy.Factory failoverStrategyFactory,
+            final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+            final ExecutionOperations executionOperations,
+            final ExecutionVertexVersioner executionVertexVersioner,
+            final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+            long initializationTimestamp,
+            final ComponentMainThreadExecutor mainThreadExecutor,
+            final JobStatusListener jobStatusListener,
+            final ExecutionGraphFactory executionGraphFactory,
+            final ShuffleMaster<?> shuffleMaster,
+            final Time rpcTimeout,
+            final VertexParallelismDecider vertexParallelismDecider,
+            final int defaultMaxParallelism,
+            final BlocklistHandler blocklistHandler)
+            throws Exception {
+
+        super(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                startUpAction,
+                delayExecutor,
+                userCodeLoader,
+                checkpointsCleaner,
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                schedulingStrategyFactory,
+                failoverStrategyFactory,
+                restartBackoffTimeStrategy,
+                executionOperations,
+                executionVertexVersioner,
+                executionSlotAllocatorFactory,
+                initializationTimestamp,
+                mainThreadExecutor,
+                jobStatusListener,
+                executionGraphFactory,
+                shuffleMaster,
+                rpcTimeout,
+                vertexParallelismDecider,
+                defaultMaxParallelism);
+
+        this.maxConcurrentExecutions =
+                jobMasterConfiguration.getInteger(
+                        JobManagerOptions.SPECULATIVE_MAX_CONCURRENT_EXECUTIONS);
+
+        this.blockSlowNodeDuration =
+                jobMasterConfiguration.get(JobManagerOptions.BLOCK_SLOW_NODE_DURATION);
+
+        this.blocklistHandler = checkNotNull(blocklistHandler);
+
+        this.slowTaskDetector = new ExecutionTimeBasedSlowTaskDetector(jobMasterConfiguration);
+    }
+
+    @Override
+    protected void startSchedulingInternal() {
+        super.startSchedulingInternal();
+        slowTaskDetector.start(getExecutionGraph(), this, getMainThreadExecutor());
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        CompletableFuture<Void> future = super.closeAsync();
+        slowTaskDetector.stop();
+        return future;
+    }
+
+    @Override
+    public SpeculativeExecutionVertex getExecutionVertex(ExecutionVertexID executionVertexId) {
+        return (SpeculativeExecutionVertex) super.getExecutionVertex(executionVertexId);
+    }
+
+    @Override
+    protected void onTaskFinished(final Execution execution) {
+        // cancel all un-terminated executions because the execution vertex has finished
+        FutureUtils.assertNoException(cancelPendingExecutions(execution.getVertex().getID()));
+
+        initializeVerticesIfPossible();

Review Comment:
   `initializeVerticesIfPossible` is not needed becuase it will be called in `super.onTaskFinished(execution)` 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1784,7 +1784,7 @@ private static JobGraph sourceSinkJobGraph(final int parallelism) {
         return JobGraphTestUtils.streamingJobGraph(source, sink);
     }
 
-    private static JobVertex getOnlyJobVertex(final JobGraph jobGraph) {
+    public static JobVertex getOnlyJobVertex(final JobGraph jobGraph) {

Review Comment:
   This change is not needed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java:
##########
@@ -83,6 +83,15 @@ public Set<ExecutionVertexID> getUnmodifiedExecutionVertices(
                 .collect(Collectors.toSet());
     }
 
+    public Map<ExecutionVertexID, ExecutionVertexVersion> getExecutionVertexVersions(

Review Comment:
   What's the difference between this method and `recordVertexModifications` ? Why do we need this?
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -308,7 +308,7 @@ public int getParallelism() {
      * @param parallelism The parallelism for the task.
      */
     public void setParallelism(int parallelism) {
-        if (parallelism < 1) {
+        if (parallelism < 1 && parallelism != ExecutionConfig.PARALLELISM_DEFAULT) {

Review Comment:
   Will we pass `ExecutionConfig.PARALLELISM_DEFAULT` here ?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java:
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
+import org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismDecider;
+import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleTestUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore;
+
+/** A builder to create {@link DefaultScheduler} instances for testing. */
+public class DefaultSchedulerBuilder {

Review Comment:
   How about renaming it to `SchedulerBuilder` and provide `buildDefaultScheduler` and `buildAdaptiveBatchScheduler` ?  
   Comments should also be updated.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##########
@@ -324,13 +332,24 @@ private void restartTasksWithDelay(final FailureHandlingResult failureHandlingRe
                                 .values());
         final boolean globalRecovery = failureHandlingResult.isGlobalFailure();
 
+        if (globalRecovery) {

Review Comment:
   I think the changes of restart log should be a separate commit(including changes in `RestartPipelinedRegionFailoverStrategy`).
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -385,7 +386,8 @@ private SchedulerNG createScheduler(
                         initializationTimestamp,
                         getMainThreadExecutor(),
                         fatalErrorHandler,
-                        jobStatusListener);
+                        jobStatusListener,
+                        new NoOpBlocklistHandler());

Review Comment:
   The `JobMaster#blocklistHandler` should be passed here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -746,9 +741,7 @@ private boolean isNotifiable(
         return false;
     }
 
-    protected void updateTaskExecutionStateInternal(
-            final ExecutionVertexID executionVertexId,
-            final TaskExecutionStateTransition taskExecutionState) {}
+    protected abstract void onTaskExecutionStateUpdate(final Execution execution);

Review Comment:
   How about move the implementation here and provide two abstract method `onTaskFinished` and `onTaskFailed` ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blocklist.BlockedNode;
+import org.apache.flink.runtime.blocklist.BlocklistHandler;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.SpeculativeExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionOperations;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetector;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetectorListener;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.topology.Vertex;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IterableUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The speculative scheduler. */
+public class SpeculativeScheduler extends AdaptiveBatchScheduler
+        implements SlowTaskDetectorListener {
+
+    private final int maxConcurrentExecutions;
+
+    private final Duration blockSlowNodeDuration;
+
+    private final BlocklistHandler blocklistHandler;
+
+    private final SlowTaskDetector slowTaskDetector;
+
+    public SpeculativeScheduler(
+            final Logger log,
+            final JobGraph jobGraph,
+            final Executor ioExecutor,
+            final Configuration jobMasterConfiguration,
+            final Consumer<ComponentMainThreadExecutor> startUpAction,
+            final ScheduledExecutor delayExecutor,
+            final ClassLoader userCodeLoader,
+            final CheckpointsCleaner checkpointsCleaner,
+            final CheckpointRecoveryFactory checkpointRecoveryFactory,
+            final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            final SchedulingStrategyFactory schedulingStrategyFactory,
+            final FailoverStrategy.Factory failoverStrategyFactory,
+            final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+            final ExecutionOperations executionOperations,
+            final ExecutionVertexVersioner executionVertexVersioner,
+            final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+            long initializationTimestamp,
+            final ComponentMainThreadExecutor mainThreadExecutor,
+            final JobStatusListener jobStatusListener,
+            final ExecutionGraphFactory executionGraphFactory,
+            final ShuffleMaster<?> shuffleMaster,
+            final Time rpcTimeout,
+            final VertexParallelismDecider vertexParallelismDecider,
+            final int defaultMaxParallelism,
+            final BlocklistHandler blocklistHandler)
+            throws Exception {
+
+        super(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                startUpAction,
+                delayExecutor,
+                userCodeLoader,
+                checkpointsCleaner,
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                schedulingStrategyFactory,
+                failoverStrategyFactory,
+                restartBackoffTimeStrategy,
+                executionOperations,
+                executionVertexVersioner,
+                executionSlotAllocatorFactory,
+                initializationTimestamp,
+                mainThreadExecutor,
+                jobStatusListener,
+                executionGraphFactory,
+                shuffleMaster,
+                rpcTimeout,
+                vertexParallelismDecider,
+                defaultMaxParallelism);
+
+        this.maxConcurrentExecutions =
+                jobMasterConfiguration.getInteger(
+                        JobManagerOptions.SPECULATIVE_MAX_CONCURRENT_EXECUTIONS);
+
+        this.blockSlowNodeDuration =
+                jobMasterConfiguration.get(JobManagerOptions.BLOCK_SLOW_NODE_DURATION);
+
+        this.blocklistHandler = checkNotNull(blocklistHandler);
+
+        this.slowTaskDetector = new ExecutionTimeBasedSlowTaskDetector(jobMasterConfiguration);
+    }
+
+    @Override
+    protected void startSchedulingInternal() {
+        super.startSchedulingInternal();
+        slowTaskDetector.start(getExecutionGraph(), this, getMainThreadExecutor());
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        CompletableFuture<Void> future = super.closeAsync();
+        slowTaskDetector.stop();
+        return future;
+    }
+
+    @Override
+    public SpeculativeExecutionVertex getExecutionVertex(ExecutionVertexID executionVertexId) {
+        return (SpeculativeExecutionVertex) super.getExecutionVertex(executionVertexId);
+    }
+
+    @Override
+    protected void onTaskFinished(final Execution execution) {
+        // cancel all un-terminated executions because the execution vertex has finished
+        FutureUtils.assertNoException(cancelPendingExecutions(execution.getVertex().getID()));
+
+        initializeVerticesIfPossible();
+
+        super.onTaskFinished(execution);
+    }
+
+    private CompletableFuture<?> cancelPendingExecutions(
+            final ExecutionVertexID executionVertexId) {
+        final SpeculativeExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
+
+        final List<CompletableFuture<?>> cancelingFutures = new ArrayList<>();
+        for (Execution execution : executionVertex.getCurrentExecutions()) {
+            if (!execution.getState().isTerminal()) {
+                execution.cancel();
+                cancelingFutures.add(execution.getReleaseFuture());
+            }
+        }
+        cancelAllPendingSlotRequests(executionVertexId);
+        return FutureUtils.combineAll(cancelingFutures);
+    }
+
+    @Override
+    protected void onTaskFailed(final Execution execution) {
+        final SpeculativeExecutionVertex executionVertex =
+                getExecutionVertex(execution.getVertex().getID());
+        final ExecutionAttemptID attemptId = execution.getAttemptId();
+
+        // when an execution fails, remove it from current executions to make room for future
+        // speculative executions
+        executionVertex.archiveFailedExecution(attemptId);
+        executionSlotAllocator.cancel(attemptId);
+
+        super.onTaskFailed(execution);
+    }
+
+    @Override
+    protected void handleTaskFailure(
+            final Execution failedExecution, @Nullable final Throwable error) {
+
+        final SpeculativeExecutionVertex executionVertex =
+                getExecutionVertex(failedExecution.getVertex().getID());
+
+        // if the execution vertex is not possible finish or a PartitionException occurred, trigger
+        // an execution vertex failover to recover
+        if (!isExecutionVertexPossibleToFinish(executionVertex)
+                || ExceptionUtils.findThrowable(error, PartitionException.class).isPresent()) {
+            super.handleTaskFailure(failedExecution, error);
+        } else {
+            // add the execution failure to exception history even though not restarting the entire
+            // execution vertex
+            final long timestamp = System.currentTimeMillis();
+            setGlobalFailureCause(error, timestamp);
+            final FailureHandlingResult failureHandlingResult =
+                    executionFailureHandler.getFailureHandlingResult(
+                            failedExecution, error, timestamp);
+            if (failureHandlingResult.canRestart()) {
+                archiveFromFailureHandlingResult(
+                        createFailureHandlingResultSnapshot(failureHandlingResult));
+            } else {
+                failJob(error, timestamp);
+            }
+        }
+    }
+
+    private static boolean isExecutionVertexPossibleToFinish(
+            final SpeculativeExecutionVertex executionVertex) {
+        boolean anyExecutionPossibleToFinish = false;
+        for (Execution execution : executionVertex.getCurrentExecutions()) {
+            // if any execution has finished, no execution of the same execution vertex should fail
+            // after that
+            checkState(execution.getState() != ExecutionState.FINISHED);
+
+            if (execution.getState() == ExecutionState.CREATED
+                    || execution.getState() == ExecutionState.SCHEDULED
+                    || execution.getState() == ExecutionState.DEPLOYING
+                    || execution.getState() == ExecutionState.INITIALIZING
+                    || execution.getState() == ExecutionState.RUNNING) {
+                anyExecutionPossibleToFinish = true;
+            }
+        }
+        return anyExecutionPossibleToFinish;
+    }
+
+    @Override
+    protected void cancelAllPendingSlotRequestsInternal() {

Review Comment:
   `cancelAllPendingSlotRequestsInternal` `cancelAllPendingSlotRequestsForVertices` and `cancelAllPendingSlotRequests` is not needed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blocklist.BlockedNode;
+import org.apache.flink.runtime.blocklist.BlocklistHandler;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.SpeculativeExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionOperations;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetector;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetectorListener;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.topology.Vertex;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IterableUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The speculative scheduler. */
+public class SpeculativeScheduler extends AdaptiveBatchScheduler
+        implements SlowTaskDetectorListener {
+
+    private final int maxConcurrentExecutions;
+
+    private final Duration blockSlowNodeDuration;
+
+    private final BlocklistHandler blocklistHandler;
+
+    private final SlowTaskDetector slowTaskDetector;
+
+    public SpeculativeScheduler(
+            final Logger log,
+            final JobGraph jobGraph,
+            final Executor ioExecutor,
+            final Configuration jobMasterConfiguration,
+            final Consumer<ComponentMainThreadExecutor> startUpAction,
+            final ScheduledExecutor delayExecutor,
+            final ClassLoader userCodeLoader,
+            final CheckpointsCleaner checkpointsCleaner,
+            final CheckpointRecoveryFactory checkpointRecoveryFactory,
+            final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            final SchedulingStrategyFactory schedulingStrategyFactory,
+            final FailoverStrategy.Factory failoverStrategyFactory,
+            final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+            final ExecutionOperations executionOperations,
+            final ExecutionVertexVersioner executionVertexVersioner,
+            final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+            long initializationTimestamp,
+            final ComponentMainThreadExecutor mainThreadExecutor,
+            final JobStatusListener jobStatusListener,
+            final ExecutionGraphFactory executionGraphFactory,
+            final ShuffleMaster<?> shuffleMaster,
+            final Time rpcTimeout,
+            final VertexParallelismDecider vertexParallelismDecider,
+            final int defaultMaxParallelism,
+            final BlocklistHandler blocklistHandler)
+            throws Exception {
+
+        super(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                startUpAction,
+                delayExecutor,
+                userCodeLoader,
+                checkpointsCleaner,
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                schedulingStrategyFactory,
+                failoverStrategyFactory,
+                restartBackoffTimeStrategy,
+                executionOperations,
+                executionVertexVersioner,
+                executionSlotAllocatorFactory,
+                initializationTimestamp,
+                mainThreadExecutor,
+                jobStatusListener,
+                executionGraphFactory,
+                shuffleMaster,
+                rpcTimeout,
+                vertexParallelismDecider,
+                defaultMaxParallelism);
+
+        this.maxConcurrentExecutions =
+                jobMasterConfiguration.getInteger(
+                        JobManagerOptions.SPECULATIVE_MAX_CONCURRENT_EXECUTIONS);
+
+        this.blockSlowNodeDuration =
+                jobMasterConfiguration.get(JobManagerOptions.BLOCK_SLOW_NODE_DURATION);
+
+        this.blocklistHandler = checkNotNull(blocklistHandler);
+
+        this.slowTaskDetector = new ExecutionTimeBasedSlowTaskDetector(jobMasterConfiguration);
+    }
+
+    @Override
+    protected void startSchedulingInternal() {
+        super.startSchedulingInternal();
+        slowTaskDetector.start(getExecutionGraph(), this, getMainThreadExecutor());
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        CompletableFuture<Void> future = super.closeAsync();
+        slowTaskDetector.stop();
+        return future;
+    }
+
+    @Override
+    public SpeculativeExecutionVertex getExecutionVertex(ExecutionVertexID executionVertexId) {
+        return (SpeculativeExecutionVertex) super.getExecutionVertex(executionVertexId);
+    }
+
+    @Override
+    protected void onTaskFinished(final Execution execution) {
+        // cancel all un-terminated executions because the execution vertex has finished
+        FutureUtils.assertNoException(cancelPendingExecutions(execution.getVertex().getID()));
+
+        initializeVerticesIfPossible();
+
+        super.onTaskFinished(execution);
+    }
+
+    private CompletableFuture<?> cancelPendingExecutions(
+            final ExecutionVertexID executionVertexId) {
+        final SpeculativeExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
+
+        final List<CompletableFuture<?>> cancelingFutures = new ArrayList<>();
+        for (Execution execution : executionVertex.getCurrentExecutions()) {
+            if (!execution.getState().isTerminal()) {
+                execution.cancel();
+                cancelingFutures.add(execution.getReleaseFuture());
+            }
+        }
+        cancelAllPendingSlotRequests(executionVertexId);
+        return FutureUtils.combineAll(cancelingFutures);
+    }
+
+    @Override
+    protected void onTaskFailed(final Execution execution) {
+        final SpeculativeExecutionVertex executionVertex =
+                getExecutionVertex(execution.getVertex().getID());
+        final ExecutionAttemptID attemptId = execution.getAttemptId();
+
+        // when an execution fails, remove it from current executions to make room for future
+        // speculative executions
+        executionVertex.archiveFailedExecution(attemptId);
+        executionSlotAllocator.cancel(attemptId);
+
+        super.onTaskFailed(execution);
+    }
+
+    @Override
+    protected void handleTaskFailure(
+            final Execution failedExecution, @Nullable final Throwable error) {
+
+        final SpeculativeExecutionVertex executionVertex =
+                getExecutionVertex(failedExecution.getVertex().getID());
+
+        // if the execution vertex is not possible finish or a PartitionException occurred, trigger
+        // an execution vertex failover to recover
+        if (!isExecutionVertexPossibleToFinish(executionVertex)
+                || ExceptionUtils.findThrowable(error, PartitionException.class).isPresent()) {
+            super.handleTaskFailure(failedExecution, error);
+        } else {
+            // add the execution failure to exception history even though not restarting the entire
+            // execution vertex
+            final long timestamp = System.currentTimeMillis();
+            setGlobalFailureCause(error, timestamp);
+            final FailureHandlingResult failureHandlingResult =
+                    executionFailureHandler.getFailureHandlingResult(
+                            failedExecution, error, timestamp);
+            if (failureHandlingResult.canRestart()) {
+                archiveFromFailureHandlingResult(
+                        createFailureHandlingResultSnapshot(failureHandlingResult));
+            } else {
+                failJob(error, timestamp);
+            }
+        }
+    }
+
+    private static boolean isExecutionVertexPossibleToFinish(
+            final SpeculativeExecutionVertex executionVertex) {
+        boolean anyExecutionPossibleToFinish = false;
+        for (Execution execution : executionVertex.getCurrentExecutions()) {
+            // if any execution has finished, no execution of the same execution vertex should fail
+            // after that
+            checkState(execution.getState() != ExecutionState.FINISHED);
+
+            if (execution.getState() == ExecutionState.CREATED
+                    || execution.getState() == ExecutionState.SCHEDULED
+                    || execution.getState() == ExecutionState.DEPLOYING
+                    || execution.getState() == ExecutionState.INITIALIZING
+                    || execution.getState() == ExecutionState.RUNNING) {
+                anyExecutionPossibleToFinish = true;
+            }
+        }
+        return anyExecutionPossibleToFinish;
+    }
+
+    @Override
+    protected void cancelAllPendingSlotRequestsInternal() {
+        IterableUtils.toStream(getSchedulingTopology().getVertices())
+                .map(Vertex::getId)
+                .forEach(this::cancelAllPendingSlotRequests);
+    }
+
+    @Override
+    protected void cancelAllPendingSlotRequestsForVertices(
+            final Set<ExecutionVertexID> executionVertices) {
+        executionVertices.forEach(this::cancelAllPendingSlotRequests);
+    }
+
+    private void cancelAllPendingSlotRequests(final ExecutionVertexID executionVertexId) {
+        final SpeculativeExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
+        executionVertex
+                .getCurrentExecutions()
+                .forEach(e -> executionSlotAllocator.cancel(e.getAttemptId()));
+    }
+
+    @Override
+    public void notifySlowTasks(Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks) {
+        // add slow nodes to blocklist before scheduling new speculative executions
+        final long blockedEndTimestamp =
+                System.currentTimeMillis() + blockSlowNodeDuration.toMillis();
+        final Collection<BlockedNode> nodesToBlock =
+                getSlowNodeIds(slowTasks).stream()
+                        .map(
+                                nodeId ->
+                                        new BlockedNode(
+                                                nodeId,
+                                                "Node is detected to be slow.",
+                                                blockedEndTimestamp))
+                        .collect(Collectors.toList());
+        blocklistHandler.addNewBlockedNodes(nodesToBlock);
+
+        final List<Execution> newSpeculativeExecutions = new ArrayList<>();
+        final Set<ExecutionVertexID> verticesToDeploy = new HashSet<>();
+        for (ExecutionVertexID executionVertexId : slowTasks.keySet()) {
+            final SpeculativeExecutionVertex executionVertex =
+                    getExecutionVertex(executionVertexId);
+
+            if (executionVertex.containsSources() || executionVertex.containsSinks()) {
+                continue;
+            }
+
+            final int currentConcurrentExecutions = executionVertex.getCurrentExecutions().size();
+            final int newSpeculativeExecutionsToDeploy =
+                    maxConcurrentExecutions - currentConcurrentExecutions;
+            if (newSpeculativeExecutionsToDeploy > 0) {
+                log.info(
+                        "{} ({}) is detected as a slow vertex, create and deploy {} new speculative executions for it.",

Review Comment:
   Mayebe also print number of current concurrent executions here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blocklist.BlockedNode;
+import org.apache.flink.runtime.blocklist.BlocklistHandler;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.SpeculativeExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionOperations;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetector;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetectorListener;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.topology.Vertex;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IterableUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The speculative scheduler. */
+public class SpeculativeScheduler extends AdaptiveBatchScheduler
+        implements SlowTaskDetectorListener {
+
+    private final int maxConcurrentExecutions;
+
+    private final Duration blockSlowNodeDuration;
+
+    private final BlocklistHandler blocklistHandler;
+
+    private final SlowTaskDetector slowTaskDetector;
+
+    public SpeculativeScheduler(
+            final Logger log,
+            final JobGraph jobGraph,
+            final Executor ioExecutor,
+            final Configuration jobMasterConfiguration,
+            final Consumer<ComponentMainThreadExecutor> startUpAction,
+            final ScheduledExecutor delayExecutor,
+            final ClassLoader userCodeLoader,
+            final CheckpointsCleaner checkpointsCleaner,
+            final CheckpointRecoveryFactory checkpointRecoveryFactory,
+            final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            final SchedulingStrategyFactory schedulingStrategyFactory,
+            final FailoverStrategy.Factory failoverStrategyFactory,
+            final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+            final ExecutionOperations executionOperations,
+            final ExecutionVertexVersioner executionVertexVersioner,
+            final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+            long initializationTimestamp,
+            final ComponentMainThreadExecutor mainThreadExecutor,
+            final JobStatusListener jobStatusListener,
+            final ExecutionGraphFactory executionGraphFactory,
+            final ShuffleMaster<?> shuffleMaster,
+            final Time rpcTimeout,
+            final VertexParallelismDecider vertexParallelismDecider,
+            final int defaultMaxParallelism,
+            final BlocklistHandler blocklistHandler)
+            throws Exception {
+
+        super(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                startUpAction,
+                delayExecutor,
+                userCodeLoader,
+                checkpointsCleaner,
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                schedulingStrategyFactory,
+                failoverStrategyFactory,
+                restartBackoffTimeStrategy,
+                executionOperations,
+                executionVertexVersioner,
+                executionSlotAllocatorFactory,
+                initializationTimestamp,
+                mainThreadExecutor,
+                jobStatusListener,
+                executionGraphFactory,
+                shuffleMaster,
+                rpcTimeout,
+                vertexParallelismDecider,
+                defaultMaxParallelism);
+
+        this.maxConcurrentExecutions =
+                jobMasterConfiguration.getInteger(
+                        JobManagerOptions.SPECULATIVE_MAX_CONCURRENT_EXECUTIONS);
+
+        this.blockSlowNodeDuration =
+                jobMasterConfiguration.get(JobManagerOptions.BLOCK_SLOW_NODE_DURATION);
+
+        this.blocklistHandler = checkNotNull(blocklistHandler);
+
+        this.slowTaskDetector = new ExecutionTimeBasedSlowTaskDetector(jobMasterConfiguration);
+    }
+
+    @Override
+    protected void startSchedulingInternal() {
+        super.startSchedulingInternal();
+        slowTaskDetector.start(getExecutionGraph(), this, getMainThreadExecutor());
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        CompletableFuture<Void> future = super.closeAsync();
+        slowTaskDetector.stop();
+        return future;
+    }
+
+    @Override
+    public SpeculativeExecutionVertex getExecutionVertex(ExecutionVertexID executionVertexId) {
+        return (SpeculativeExecutionVertex) super.getExecutionVertex(executionVertexId);
+    }
+
+    @Override
+    protected void onTaskFinished(final Execution execution) {
+        // cancel all un-terminated executions because the execution vertex has finished
+        FutureUtils.assertNoException(cancelPendingExecutions(execution.getVertex().getID()));
+
+        initializeVerticesIfPossible();
+
+        super.onTaskFinished(execution);
+    }
+
+    private CompletableFuture<?> cancelPendingExecutions(
+            final ExecutionVertexID executionVertexId) {
+        final SpeculativeExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
+
+        final List<CompletableFuture<?>> cancelingFutures = new ArrayList<>();
+        for (Execution execution : executionVertex.getCurrentExecutions()) {
+            if (!execution.getState().isTerminal()) {
+                execution.cancel();
+                cancelingFutures.add(execution.getReleaseFuture());
+            }
+        }
+        cancelAllPendingSlotRequests(executionVertexId);
+        return FutureUtils.combineAll(cancelingFutures);
+    }
+
+    @Override
+    protected void onTaskFailed(final Execution execution) {
+        final SpeculativeExecutionVertex executionVertex =
+                getExecutionVertex(execution.getVertex().getID());
+        final ExecutionAttemptID attemptId = execution.getAttemptId();
+
+        // when an execution fails, remove it from current executions to make room for future
+        // speculative executions
+        executionVertex.archiveFailedExecution(attemptId);
+        executionSlotAllocator.cancel(attemptId);
+
+        super.onTaskFailed(execution);
+    }
+
+    @Override
+    protected void handleTaskFailure(
+            final Execution failedExecution, @Nullable final Throwable error) {
+
+        final SpeculativeExecutionVertex executionVertex =
+                getExecutionVertex(failedExecution.getVertex().getID());
+
+        // if the execution vertex is not possible finish or a PartitionException occurred, trigger
+        // an execution vertex failover to recover
+        if (!isExecutionVertexPossibleToFinish(executionVertex)
+                || ExceptionUtils.findThrowable(error, PartitionException.class).isPresent()) {
+            super.handleTaskFailure(failedExecution, error);
+        } else {
+            // add the execution failure to exception history even though not restarting the entire
+            // execution vertex
+            final long timestamp = System.currentTimeMillis();
+            setGlobalFailureCause(error, timestamp);
+            final FailureHandlingResult failureHandlingResult =
+                    executionFailureHandler.getFailureHandlingResult(
+                            failedExecution, error, timestamp);
+            if (failureHandlingResult.canRestart()) {
+                archiveFromFailureHandlingResult(
+                        createFailureHandlingResultSnapshot(failureHandlingResult));
+            } else {
+                failJob(error, timestamp);
+            }
+        }
+    }
+
+    private static boolean isExecutionVertexPossibleToFinish(
+            final SpeculativeExecutionVertex executionVertex) {
+        boolean anyExecutionPossibleToFinish = false;
+        for (Execution execution : executionVertex.getCurrentExecutions()) {
+            // if any execution has finished, no execution of the same execution vertex should fail
+            // after that
+            checkState(execution.getState() != ExecutionState.FINISHED);
+
+            if (execution.getState() == ExecutionState.CREATED
+                    || execution.getState() == ExecutionState.SCHEDULED
+                    || execution.getState() == ExecutionState.DEPLOYING
+                    || execution.getState() == ExecutionState.INITIALIZING
+                    || execution.getState() == ExecutionState.RUNNING) {
+                anyExecutionPossibleToFinish = true;
+            }
+        }
+        return anyExecutionPossibleToFinish;
+    }
+
+    @Override
+    protected void cancelAllPendingSlotRequestsInternal() {
+        IterableUtils.toStream(getSchedulingTopology().getVertices())
+                .map(Vertex::getId)
+                .forEach(this::cancelAllPendingSlotRequests);
+    }
+
+    @Override
+    protected void cancelAllPendingSlotRequestsForVertices(
+            final Set<ExecutionVertexID> executionVertices) {
+        executionVertices.forEach(this::cancelAllPendingSlotRequests);
+    }
+
+    private void cancelAllPendingSlotRequests(final ExecutionVertexID executionVertexId) {
+        final SpeculativeExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
+        executionVertex
+                .getCurrentExecutions()
+                .forEach(e -> executionSlotAllocator.cancel(e.getAttemptId()));
+    }
+
+    @Override
+    public void notifySlowTasks(Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks) {
+        // add slow nodes to blocklist before scheduling new speculative executions
+        final long blockedEndTimestamp =
+                System.currentTimeMillis() + blockSlowNodeDuration.toMillis();
+        final Collection<BlockedNode> nodesToBlock =
+                getSlowNodeIds(slowTasks).stream()
+                        .map(
+                                nodeId ->
+                                        new BlockedNode(
+                                                nodeId,
+                                                "Node is detected to be slow.",
+                                                blockedEndTimestamp))
+                        .collect(Collectors.toList());
+        blocklistHandler.addNewBlockedNodes(nodesToBlock);
+
+        final List<Execution> newSpeculativeExecutions = new ArrayList<>();
+        final Set<ExecutionVertexID> verticesToDeploy = new HashSet<>();
+        for (ExecutionVertexID executionVertexId : slowTasks.keySet()) {
+            final SpeculativeExecutionVertex executionVertex =
+                    getExecutionVertex(executionVertexId);
+
+            if (executionVertex.containsSources() || executionVertex.containsSinks()) {

Review Comment:
   I think we will change the logic here after supporting the source?
   
   



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java:
##########
@@ -221,24 +204,26 @@ public void testReturningLogicalSlotsRemovesSharedSlot() throws Exception {
     }
 
     @Test
-    public void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() throws Exception {
+    void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() throws Exception {
         // physical slot request is not completed and does not complete logical requests
         testLogicalSlotRequestCancellationOrRelease(
                 true,
                 true,
                 (context, assignment) -> {
                     context.getAllocator().cancel(assignment.getExecutionAttemptId());
-                    try {
-                        assignment.getLogicalSlotFuture().get();
-                        fail("The logical future must finish with the cancellation exception");
-                    } catch (InterruptedException | ExecutionException e) {
-                        assertThat(e.getCause(), instanceOf(CancellationException.class));
-                    }
+                    assertThatThrownBy(
+                                    () -> {
+                                        context.getAllocator()
+                                                .cancel(assignment.getExecutionAttemptId());
+                                        assignment.getLogicalSlotFuture().get();
+                                    })
+                            .as("The logical future must finish with the cancellation exception.")
+                            .hasCauseInstanceOf(CancellationException.class);

Review Comment:
   Maybe `hasRootCauseInstanceOf` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org