You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/06/24 07:25:47 UTC
[flink] branch master updated: [FLINK-28133][runtime] Rework DefaultScheduler to directly deploy executions
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c66101d5186 [FLINK-28133][runtime] Rework DefaultScheduler to directly deploy executions
c66101d5186 is described below
commit c66101d518615ec2d6a36c829568583a6a5d93f9
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Fri Jun 10 15:33:51 2022 +0800
[FLINK-28133][runtime] Rework DefaultScheduler to directly deploy executions
This closes #20039.
---
.../flink/runtime/executiongraph/Execution.java | 2 +-
.../scheduler/DefaultExecutionDeployer.java | 400 +++++++++++++++++++++
...ations.java => DefaultExecutionOperations.java} | 19 +-
.../flink/runtime/scheduler/DefaultScheduler.java | 315 ++--------------
.../runtime/scheduler/DefaultSchedulerFactory.java | 7 +-
.../flink/runtime/scheduler/DeploymentHandle.java | 78 ----
.../flink/runtime/scheduler/ExecutionDeployer.java | 81 +++++
...texOperations.java => ExecutionOperations.java} | 30 +-
.../runtime/scheduler/ExecutionSlotAllocator.java | 15 +-
...ssignment.java => ExecutionSlotAssignment.java} | 21 +-
.../apache/flink/runtime/scheduler/SharedSlot.java | 2 +-
.../SlotSharingExecutionSlotAllocator.java | 61 +++-
.../adaptivebatch/AdaptiveBatchScheduler.java | 10 +-
.../AdaptiveBatchSchedulerFactory.java | 4 +-
.../scheduler/DefaultExecutionDeployerTest.java | 388 ++++++++++++++++++++
.../runtime/scheduler/DefaultSchedulerTest.java | 59 +--
.../runtime/scheduler/DeploymentHandleTest.java | 94 -----
.../runtime/scheduler/SchedulerTestingUtils.java | 24 +-
.../SlotSharingExecutionSlotAllocatorTest.java | 52 +--
.../TestExecutionOperationsDecorator.java | 156 ++++++++
.../scheduler/TestExecutionSlotAllocator.java | 110 +++---
.../TestExecutionVertexOperationsDecorator.java | 133 -------
.../AdaptiveBatchSchedulerTestUtils.java | 2 +-
23 files changed, 1329 insertions(+), 734 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index b18f9b2fa23..79c2cb9602f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -888,7 +888,7 @@ public class Execution
*
* @param t The exception that caused the task to fail.
*/
- void markFailed(Throwable t) {
+ public void markFailed(Throwable t) {
processFail(t, false);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java
new file mode 100644
index 00000000000..e5b7d18b209
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Default implementation of {@link ExecutionDeployer}. */
+public class DefaultExecutionDeployer implements ExecutionDeployer {
+
+ private final Logger log;
+
+ private final ExecutionSlotAllocator executionSlotAllocator;
+
+ private final ExecutionOperations executionOperations;
+
+ private final ExecutionVertexVersioner executionVertexVersioner;
+
+ private final Time partitionRegistrationTimeout;
+
+ private final Function<ExecutionAttemptID, Execution> executionRetriever;
+
+ private final BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc;
+
+ private final ComponentMainThreadExecutor mainThreadExecutor;
+
+ private DefaultExecutionDeployer(
+ final Logger log,
+ final ExecutionSlotAllocator executionSlotAllocator,
+ final ExecutionOperations executionOperations,
+ final ExecutionVertexVersioner executionVertexVersioner,
+ final Time partitionRegistrationTimeout,
+ final Function<ExecutionAttemptID, Execution> executionRetriever,
+ final BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc,
+ final ComponentMainThreadExecutor mainThreadExecutor) {
+
+ this.log = checkNotNull(log);
+ this.executionSlotAllocator = checkNotNull(executionSlotAllocator);
+ this.executionOperations = checkNotNull(executionOperations);
+ this.executionVertexVersioner = checkNotNull(executionVertexVersioner);
+ this.partitionRegistrationTimeout = checkNotNull(partitionRegistrationTimeout);
+ this.executionRetriever = checkNotNull(executionRetriever);
+ this.allocationReservationFunc = checkNotNull(allocationReservationFunc);
+ this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+ }
+
+ @Override
+ public void allocateSlotsAndDeploy(
+ final List<Execution> executionsToDeploy,
+ final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex) {
+ validateExecutionStates(executionsToDeploy);
+
+ transitionToScheduled(executionsToDeploy);
+
+ final List<ExecutionSlotAssignment> executionSlotAssignments =
+ allocateSlotsFor(executionsToDeploy);
+
+ final List<ExecutionDeploymentHandle> deploymentHandles =
+ createDeploymentHandles(requiredVersionByVertex, executionSlotAssignments);
+
+ waitForAllSlotsAndDeploy(deploymentHandles);
+ }
+
+ private void validateExecutionStates(final Collection<Execution> executionsToDeploy) {
+ executionsToDeploy.forEach(
+ e ->
+ checkState(
+ e.getState() == ExecutionState.CREATED,
+ "Expected execution %s to be in CREATED state, was: %s",
+ e.getAttemptId(),
+ e.getState()));
+ }
+
+ private void transitionToScheduled(final List<Execution> executionsToDeploy) {
+ executionsToDeploy.forEach(e -> e.transitionState(ExecutionState.SCHEDULED));
+ }
+
+ private List<ExecutionSlotAssignment> allocateSlotsFor(
+ final List<Execution> executionsToDeploy) {
+ final List<ExecutionAttemptID> executionAttemptIds =
+ executionsToDeploy.stream()
+ .map(Execution::getAttemptId)
+ .collect(Collectors.toList());
+ return executionSlotAllocator.allocateSlotsFor(executionAttemptIds);
+ }
+
+ private List<ExecutionDeploymentHandle> createDeploymentHandles(
+ final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex,
+ final List<ExecutionSlotAssignment> executionSlotAssignments) {
+
+ return executionSlotAssignments.stream()
+ .map(
+ executionSlotAssignment -> {
+ final Execution execution =
+ getExecutionOrThrow(
+ executionSlotAssignment.getExecutionAttemptId());
+ final ExecutionVertexID executionVertexId =
+ execution.getVertex().getID();
+ return new ExecutionDeploymentHandle(
+ executionSlotAssignment,
+ requiredVersionByVertex.get(executionVertexId));
+ })
+ .collect(Collectors.toList());
+ }
+
+ private void waitForAllSlotsAndDeploy(final List<ExecutionDeploymentHandle> deploymentHandles) {
+ FutureUtils.assertNoException(
+ assignAllResourcesAndRegisterProducedPartitions(deploymentHandles)
+ .handle(deployAll(deploymentHandles)));
+ }
+
+ private CompletableFuture<Void> assignAllResourcesAndRegisterProducedPartitions(
+ final List<ExecutionDeploymentHandle> deploymentHandles) {
+ final List<CompletableFuture<Void>> resultFutures = new ArrayList<>();
+ for (ExecutionDeploymentHandle deploymentHandle : deploymentHandles) {
+ final CompletableFuture<Void> resultFuture =
+ deploymentHandle
+ .getLogicalSlotFuture()
+ .handle(assignResource(deploymentHandle))
+ .thenCompose(registerProducedPartitions(deploymentHandle))
+ .handle(
+ (ignore, throwable) -> {
+ if (throwable != null) {
+ handleTaskDeploymentFailure(
+ deploymentHandle.getExecutionAttemptId(),
+ throwable);
+ }
+ return null;
+ });
+
+ resultFutures.add(resultFuture);
+ }
+ return FutureUtils.waitForAll(resultFutures);
+ }
+
+ private BiFunction<Void, Throwable, Void> deployAll(
+ final List<ExecutionDeploymentHandle> deploymentHandles) {
+ return (ignored, throwable) -> {
+ propagateIfNonNull(throwable);
+ for (final ExecutionDeploymentHandle deploymentHandle : deploymentHandles) {
+ final CompletableFuture<LogicalSlot> slotAssigned =
+ deploymentHandle.getLogicalSlotFuture();
+ checkState(slotAssigned.isDone());
+
+ FutureUtils.assertNoException(
+ slotAssigned.handle(deployOrHandleError(deploymentHandle)));
+ }
+ return null;
+ };
+ }
+
+ private static void propagateIfNonNull(final Throwable throwable) {
+ if (throwable != null) {
+ throw new CompletionException(throwable);
+ }
+ }
+
+ private BiFunction<LogicalSlot, Throwable, LogicalSlot> assignResource(
+ final ExecutionDeploymentHandle deploymentHandle) {
+
+ return (logicalSlot, throwable) -> {
+ final ExecutionVertexVersion requiredVertexVersion =
+ deploymentHandle.getRequiredVertexVersion();
+ final Optional<Execution> optionalExecution =
+ getExecution(deploymentHandle.getExecutionAttemptId());
+
+ if (!optionalExecution.isPresent()
+ || optionalExecution.get().getState() != ExecutionState.SCHEDULED
+ || executionVertexVersioner.isModified(requiredVertexVersion)) {
+ if (throwable == null) {
+ log.debug(
+ "Refusing to assign slot to execution {} because this deployment was "
+ + "superseded by another deployment",
+ deploymentHandle.getExecutionAttemptId());
+ releaseSlotIfPresent(logicalSlot);
+ }
+ return null;
+ }
+
+ // throw exception only if the execution version is not outdated.
+ // this ensures that canceling a pending slot request does not fail
+ // a task which is about to cancel.
+ if (throwable != null) {
+ throw new CompletionException(maybeWrapWithNoResourceAvailableException(throwable));
+ }
+
+ final Execution execution = optionalExecution.get();
+ if (!execution.tryAssignResource(logicalSlot)) {
+ throw new IllegalStateException(
+ "Could not assign resource "
+ + logicalSlot
+ + " to execution "
+ + execution
+ + '.');
+ }
+
+ // We only reserve the latest execution of an execution vertex. Because it may cause
+ // problems to reserve multiple slots for one execution vertex. Besides that, slot
+ // reservation is for local recovery and therefore is only needed by streaming jobs, in
+ // which case an execution vertex will have one only current execution.
+ allocationReservationFunc.accept(
+ execution.getAttemptId().getExecutionVertexId(), logicalSlot.getAllocationId());
+
+ return logicalSlot;
+ };
+ }
+
+ private static void releaseSlotIfPresent(@Nullable final LogicalSlot logicalSlot) {
+ if (logicalSlot != null) {
+ logicalSlot.releaseSlot(null);
+ }
+ }
+
+ private static Throwable maybeWrapWithNoResourceAvailableException(final Throwable failure) {
+ final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(failure);
+ if (strippedThrowable instanceof TimeoutException) {
+ return new NoResourceAvailableException(
+ "Could not allocate the required slot within slot request timeout. "
+ + "Please make sure that the cluster has enough resources.",
+ failure);
+ } else {
+ return failure;
+ }
+ }
+
+ private Function<LogicalSlot, CompletableFuture<Void>> registerProducedPartitions(
+ final ExecutionDeploymentHandle deploymentHandle) {
+
+ return logicalSlot -> {
+ // a null logicalSlot means the slot assignment is skipped, in which case
+ // the produced partition registration process can be skipped as well
+ if (logicalSlot != null) {
+ final Execution execution =
+ getExecutionOrThrow(deploymentHandle.getExecutionAttemptId());
+ final CompletableFuture<Void> partitionRegistrationFuture =
+ execution.registerProducedPartitions(logicalSlot.getTaskManagerLocation());
+
+ return FutureUtils.orTimeout(
+ partitionRegistrationFuture,
+ partitionRegistrationTimeout.toMilliseconds(),
+ TimeUnit.MILLISECONDS,
+ mainThreadExecutor);
+ } else {
+ return FutureUtils.completedVoidFuture();
+ }
+ };
+ }
+
+ private BiFunction<Object, Throwable, Void> deployOrHandleError(
+ final ExecutionDeploymentHandle deploymentHandle) {
+
+ return (ignored, throwable) -> {
+ final ExecutionVertexVersion requiredVertexVersion =
+ deploymentHandle.getRequiredVertexVersion();
+ final Optional<Execution> optionalExecution =
+ getExecution(deploymentHandle.getExecutionAttemptId());
+
+ if (!optionalExecution.isPresent()
+ || optionalExecution.get().getState() != ExecutionState.SCHEDULED
+ || executionVertexVersioner.isModified(requiredVertexVersion)) {
+ if (throwable == null) {
+ log.debug(
+ "Refusing to assign slot to execution {} because this deployment was "
+ + "superseded by another deployment",
+ deploymentHandle.getExecutionAttemptId());
+ }
+ return null;
+ }
+
+ final Execution execution = optionalExecution.get();
+ if (throwable == null) {
+ deployTaskSafe(execution);
+ } else {
+ handleTaskDeploymentFailure(execution.getAttemptId(), throwable);
+ }
+ return null;
+ };
+ }
+
+ private void deployTaskSafe(final Execution execution) {
+ try {
+ executionOperations.deploy(execution);
+ } catch (Throwable e) {
+ handleTaskDeploymentFailure(execution.getAttemptId(), e);
+ }
+ }
+
+ private void handleTaskDeploymentFailure(
+ final ExecutionAttemptID executionAttemptId, final Throwable error) {
+
+ final Execution execution = getExecutionOrThrow(executionAttemptId);
+ executionOperations.markFailed(execution, error);
+ }
+
+ private Execution getExecutionOrThrow(ExecutionAttemptID executionAttemptId) {
+ return getExecution(executionAttemptId).get();
+ }
+
+ private Optional<Execution> getExecution(ExecutionAttemptID executionAttemptId) {
+ return Optional.ofNullable(executionRetriever.apply(executionAttemptId));
+ }
+
+ private static class ExecutionDeploymentHandle {
+
+ private final ExecutionSlotAssignment executionSlotAssignment;
+
+ private final ExecutionVertexVersion requiredVertexVersion;
+
+ ExecutionDeploymentHandle(
+ ExecutionSlotAssignment executionSlotAssignment,
+ final ExecutionVertexVersion requiredVertexVersion) {
+ this.executionSlotAssignment = checkNotNull(executionSlotAssignment);
+ this.requiredVertexVersion = checkNotNull(requiredVertexVersion);
+ }
+
+ ExecutionAttemptID getExecutionAttemptId() {
+ return executionSlotAssignment.getExecutionAttemptId();
+ }
+
+ CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
+ return executionSlotAssignment.getLogicalSlotFuture();
+ }
+
+ ExecutionVertexVersion getRequiredVertexVersion() {
+ return requiredVertexVersion;
+ }
+ }
+
+ /** Factory to instantiate the {@link DefaultExecutionDeployer}. */
+ public static class Factory implements ExecutionDeployer.Factory {
+
+ @Override
+ public DefaultExecutionDeployer createInstance(
+ Logger log,
+ ExecutionSlotAllocator executionSlotAllocator,
+ ExecutionOperations executionOperations,
+ ExecutionVertexVersioner executionVertexVersioner,
+ Time partitionRegistrationTimeout,
+ Function<ExecutionAttemptID, Execution> executionRetriever,
+ BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc,
+ ComponentMainThreadExecutor mainThreadExecutor) {
+ return new DefaultExecutionDeployer(
+ log,
+ executionSlotAllocator,
+ executionOperations,
+ executionVertexVersioner,
+ partitionRegistrationTimeout,
+ executionRetriever,
+ allocationReservationFunc,
+ mainThreadExecutor);
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionOperations.java
similarity index 63%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionOperations.java
index d7454c830bd..4bfe8766688 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionOperations.java
@@ -20,25 +20,26 @@
package org.apache.flink.runtime.scheduler;
import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.Execution;
import java.util.concurrent.CompletableFuture;
-/** Default implementation of {@link ExecutionVertexOperations}. */
-public class DefaultExecutionVertexOperations implements ExecutionVertexOperations {
+/** Default implementation of {@link ExecutionOperations}. */
+public class DefaultExecutionOperations implements ExecutionOperations {
@Override
- public void deploy(final ExecutionVertex executionVertex) throws JobException {
- executionVertex.deploy();
+ public void deploy(Execution execution) throws JobException {
+ execution.deploy();
}
@Override
- public CompletableFuture<?> cancel(final ExecutionVertex executionVertex) {
- return executionVertex.cancel();
+ public CompletableFuture<?> cancel(Execution execution) {
+ execution.cancel();
+ return execution.getReleaseFuture();
}
@Override
- public void markFailed(final ExecutionVertex executionVertex, final Throwable cause) {
- executionVertex.markFailed(cause);
+ public void markFailed(Execution execution, Throwable cause) {
+ execution.markFailed(cause);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index e7869b6c9a9..081cfa26987 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
@@ -41,9 +43,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot;
@@ -54,7 +54,6 @@ import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.topology.Vertex;
-import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
@@ -63,7 +62,6 @@ import org.slf4j.Logger;
import javax.annotation.Nullable;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -73,17 +71,12 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.BiFunction;
import java.util.function.Consumer;
-import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
/** The future default scheduler. */
public class DefaultScheduler extends SchedulerBase implements SchedulerOperations {
@@ -100,14 +93,12 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
protected final SchedulingStrategy schedulingStrategy;
- private final ExecutionVertexOperations executionVertexOperations;
+ private final ExecutionOperations executionOperations;
private final Set<ExecutionVertexID> verticesWaitingForRestart;
private final ShuffleMaster<?> shuffleMaster;
- private final Time rpcTimeout;
-
private final Map<AllocationID, Long> reservedAllocationRefCounters;
// once an execution vertex is assigned an allocation/slot, it will reserve the allocation
@@ -115,55 +106,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
// anymore. The reserved allocation information is needed for local recovery.
private final Map<ExecutionVertexID, AllocationID> reservedAllocationByExecutionVertex;
- DefaultScheduler(
- final Logger log,
- final JobGraph jobGraph,
- final Executor ioExecutor,
- final Configuration jobMasterConfiguration,
- final Consumer<ComponentMainThreadExecutor> startUpAction,
- final ScheduledExecutor delayExecutor,
- final ClassLoader userCodeLoader,
- final CheckpointsCleaner checkpointsCleaner,
- final CheckpointRecoveryFactory checkpointRecoveryFactory,
- final JobManagerJobMetricGroup jobManagerJobMetricGroup,
- final SchedulingStrategyFactory schedulingStrategyFactory,
- final FailoverStrategy.Factory failoverStrategyFactory,
- final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
- final ExecutionVertexOperations executionVertexOperations,
- final ExecutionVertexVersioner executionVertexVersioner,
- final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
- long initializationTimestamp,
- final ComponentMainThreadExecutor mainThreadExecutor,
- final JobStatusListener jobStatusListener,
- final ExecutionGraphFactory executionGraphFactory,
- final ShuffleMaster<?> shuffleMaster,
- final Time rpcTimeout)
- throws Exception {
- this(
- log,
- jobGraph,
- ioExecutor,
- jobMasterConfiguration,
- startUpAction,
- delayExecutor,
- userCodeLoader,
- checkpointsCleaner,
- checkpointRecoveryFactory,
- jobManagerJobMetricGroup,
- schedulingStrategyFactory,
- failoverStrategyFactory,
- restartBackoffTimeStrategy,
- executionVertexOperations,
- executionVertexVersioner,
- executionSlotAllocatorFactory,
- initializationTimestamp,
- mainThreadExecutor,
- jobStatusListener,
- executionGraphFactory,
- shuffleMaster,
- rpcTimeout,
- computeVertexParallelismStore(jobGraph));
- }
+ private final ExecutionDeployer executionDeployer;
protected DefaultScheduler(
final Logger log,
@@ -179,7 +122,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
final SchedulingStrategyFactory schedulingStrategyFactory,
final FailoverStrategy.Factory failoverStrategyFactory,
final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
- final ExecutionVertexOperations executionVertexOperations,
+ final ExecutionOperations executionOperations,
final ExecutionVertexVersioner executionVertexVersioner,
final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
long initializationTimestamp,
@@ -188,7 +131,8 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
final ExecutionGraphFactory executionGraphFactory,
final ShuffleMaster<?> shuffleMaster,
final Time rpcTimeout,
- final VertexParallelismStore vertexParallelismStore)
+ final VertexParallelismStore vertexParallelismStore,
+ final ExecutionDeployer.Factory executionDeployerFactory)
throws Exception {
super(
@@ -210,9 +154,8 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
this.delayExecutor = checkNotNull(delayExecutor);
this.userCodeLoader = checkNotNull(userCodeLoader);
- this.executionVertexOperations = checkNotNull(executionVertexOperations);
+ this.executionOperations = checkNotNull(executionOperations);
this.shuffleMaster = checkNotNull(shuffleMaster);
- this.rpcTimeout = checkNotNull(rpcTimeout);
this.reservedAllocationRefCounters = new HashMap<>();
this.reservedAllocationByExecutionVertex = new HashMap<>();
@@ -238,6 +181,17 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
this.verticesWaitingForRestart = new HashSet<>();
startUpAction.accept(mainThreadExecutor);
+
+ this.executionDeployer =
+ executionDeployerFactory.createInstance(
+ log,
+ executionSlotAllocator,
+ executionOperations,
+ executionVertexVersioner,
+ rpcTimeout,
+ id -> getExecutionGraph().getRegisteredExecutions().get(id),
+ this::startReserveAllocation,
+ mainThreadExecutor);
}
// ------------------------------------------------------------------------
@@ -253,6 +207,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
protected void cancelAllPendingSlotRequestsInternal() {
IterableUtils.toStream(getSchedulingTopology().getVertices())
.map(Vertex::getId)
+ .map(this::getCurrentExecutionIdOfVertex)
.forEach(executionSlotAllocator::cancel);
}
@@ -426,7 +381,9 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
private CompletableFuture<?> cancelTasksAsync(final Set<ExecutionVertexID> verticesToRestart) {
// clean up all the related pending requests to avoid that immediately returned slot
// is used to fulfill the pending requests of these tasks
- verticesToRestart.stream().forEach(executionSlotAllocator::cancel);
+ verticesToRestart.stream()
+ .map(this::getCurrentExecutionIdOfVertex)
+ .forEach(executionSlotAllocator::cancel);
final List<CompletableFuture<?>> cancelFutures =
verticesToRestart.stream()
@@ -441,7 +398,11 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
notifyCoordinatorOfCancellation(vertex);
- return executionVertexOperations.cancel(vertex);
+ return executionOperations.cancel(vertex.getCurrentExecutionAttempt());
+ }
+
+ private ExecutionAttemptID getCurrentExecutionIdOfVertex(ExecutionVertexID executionVertexId) {
+ return getExecutionVertex(executionVertexId).getCurrentExecutionAttempt().getAttemptId();
}
// ------------------------------------------------------------------------
@@ -450,142 +411,16 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
@Override
public void allocateSlotsAndDeploy(final List<ExecutionVertexID> verticesToDeploy) {
- validateDeploymentOptions(verticesToDeploy);
-
final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex =
executionVertexVersioner.recordVertexModifications(verticesToDeploy);
- transitionToScheduled(verticesToDeploy);
-
- final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
- allocateSlots(verticesToDeploy);
-
- final List<DeploymentHandle> deploymentHandles =
- createDeploymentHandles(requiredVersionByVertex, slotExecutionVertexAssignments);
-
- waitForAllSlotsAndDeploy(deploymentHandles);
- }
-
- private void validateDeploymentOptions(final Collection<ExecutionVertexID> verticesToDeploy) {
- verticesToDeploy.stream()
- .map(this::getExecutionVertex)
- .forEach(
- v ->
- checkState(
- v.getExecutionState() == ExecutionState.CREATED,
- "expected vertex %s to be in CREATED state, was: %s",
- v.getID(),
- v.getExecutionState()));
- }
-
- private List<SlotExecutionVertexAssignment> allocateSlots(
- final List<ExecutionVertexID> verticesToDeploy) {
- return executionSlotAllocator.allocateSlotsFor(verticesToDeploy);
- }
-
- private static List<DeploymentHandle> createDeploymentHandles(
- final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex,
- final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments) {
-
- return slotExecutionVertexAssignments.stream()
- .map(
- slotExecutionVertexAssignment -> {
- final ExecutionVertexID executionVertexId =
- slotExecutionVertexAssignment.getExecutionVertexId();
- return new DeploymentHandle(
- requiredVersionByVertex.get(executionVertexId),
- slotExecutionVertexAssignment);
- })
- .collect(Collectors.toList());
- }
-
- private void waitForAllSlotsAndDeploy(final List<DeploymentHandle> deploymentHandles) {
- FutureUtils.assertNoException(
- assignAllResourcesAndRegisterProducedPartitions(deploymentHandles)
- .handle(deployAll(deploymentHandles)));
- }
-
- private CompletableFuture<Void> assignAllResourcesAndRegisterProducedPartitions(
- final List<DeploymentHandle> deploymentHandles) {
- final List<CompletableFuture<Void>> resultFutures = new ArrayList<>();
- for (DeploymentHandle deploymentHandle : deploymentHandles) {
- final CompletableFuture<Void> resultFuture =
- deploymentHandle
- .getSlotExecutionVertexAssignment()
- .getLogicalSlotFuture()
- .handle(assignResource(deploymentHandle))
- .thenCompose(registerProducedPartitions(deploymentHandle))
- .handle(
- (ignore, throwable) -> {
- if (throwable != null) {
- handleTaskDeploymentFailure(
- deploymentHandle.getExecutionVertexId(),
- throwable);
- }
- return null;
- });
-
- resultFutures.add(resultFuture);
- }
- return FutureUtils.waitForAll(resultFutures);
- }
-
- private BiFunction<Void, Throwable, Void> deployAll(
- final List<DeploymentHandle> deploymentHandles) {
- return (ignored, throwable) -> {
- propagateIfNonNull(throwable);
- for (final DeploymentHandle deploymentHandle : deploymentHandles) {
- final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
- deploymentHandle.getSlotExecutionVertexAssignment();
- final CompletableFuture<LogicalSlot> slotAssigned =
- slotExecutionVertexAssignment.getLogicalSlotFuture();
- checkState(slotAssigned.isDone());
-
- FutureUtils.assertNoException(
- slotAssigned.handle(deployOrHandleError(deploymentHandle)));
- }
- return null;
- };
- }
-
- private static void propagateIfNonNull(final Throwable throwable) {
- if (throwable != null) {
- throw new CompletionException(throwable);
- }
- }
+ final List<Execution> executionsToDeploy =
+ verticesToDeploy.stream()
+ .map(this::getExecutionVertex)
+ .map(ExecutionVertex::getCurrentExecutionAttempt)
+ .collect(Collectors.toList());
- private BiFunction<LogicalSlot, Throwable, LogicalSlot> assignResource(
- final DeploymentHandle deploymentHandle) {
- final ExecutionVertexVersion requiredVertexVersion =
- deploymentHandle.getRequiredVertexVersion();
- final ExecutionVertexID executionVertexId = deploymentHandle.getExecutionVertexId();
-
- return (logicalSlot, throwable) -> {
- if (executionVertexVersioner.isModified(requiredVertexVersion)) {
- if (throwable == null) {
- log.debug(
- "Refusing to assign slot to execution vertex {} because this deployment was "
- + "superseded by another deployment",
- executionVertexId);
- releaseSlotIfPresent(logicalSlot);
- }
- return null;
- }
-
- // throw exception only if the execution version is not outdated.
- // this ensures that canceling a pending slot request does not fail
- // a task which is about to cancel in #restartTasksWithDelay(...)
- if (throwable != null) {
- throw new CompletionException(maybeWrapWithNoResourceAvailableException(throwable));
- }
-
- final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
- executionVertex.tryAssignResource(logicalSlot);
-
- startReserveAllocation(executionVertexId, logicalSlot.getAllocationId());
-
- return logicalSlot;
- };
+ executionDeployer.allocateSlotsAndDeploy(executionsToDeploy, requiredVersionByVertex);
}
private void startReserveAllocation(
@@ -608,88 +443,6 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
}
}
- private Function<LogicalSlot, CompletableFuture<Void>> registerProducedPartitions(
- final DeploymentHandle deploymentHandle) {
- final ExecutionVertexID executionVertexId = deploymentHandle.getExecutionVertexId();
-
- return logicalSlot -> {
- // a null logicalSlot means the slot assignment is skipped, in which case
- // the produced partition registration process can be skipped as well
- if (logicalSlot != null) {
- final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
-
- final CompletableFuture<Void> partitionRegistrationFuture =
- executionVertex
- .getCurrentExecutionAttempt()
- .registerProducedPartitions(logicalSlot.getTaskManagerLocation());
-
- return FutureUtils.orTimeout(
- partitionRegistrationFuture,
- rpcTimeout.toMilliseconds(),
- TimeUnit.MILLISECONDS,
- getMainThreadExecutor());
- } else {
- return FutureUtils.completedVoidFuture();
- }
- };
- }
-
- private void releaseSlotIfPresent(@Nullable final LogicalSlot logicalSlot) {
- if (logicalSlot != null) {
- logicalSlot.releaseSlot(null);
- }
- }
-
- private void handleTaskDeploymentFailure(
- final ExecutionVertexID executionVertexId, final Throwable error) {
- executionVertexOperations.markFailed(getExecutionVertex(executionVertexId), error);
- }
-
- private static Throwable maybeWrapWithNoResourceAvailableException(final Throwable failure) {
- final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(failure);
- if (strippedThrowable instanceof TimeoutException) {
- return new NoResourceAvailableException(
- "Could not allocate the required slot within slot request timeout. "
- + "Please make sure that the cluster has enough resources.",
- failure);
- } else {
- return failure;
- }
- }
-
- private BiFunction<Object, Throwable, Void> deployOrHandleError(
- final DeploymentHandle deploymentHandle) {
- final ExecutionVertexVersion requiredVertexVersion =
- deploymentHandle.getRequiredVertexVersion();
- final ExecutionVertexID executionVertexId = requiredVertexVersion.getExecutionVertexId();
-
- return (ignored, throwable) -> {
- if (executionVertexVersioner.isModified(requiredVertexVersion)) {
- log.debug(
- "Refusing to deploy execution vertex {} because this deployment was "
- + "superseded by another deployment",
- executionVertexId);
- return null;
- }
-
- if (throwable == null) {
- deployTaskSafe(executionVertexId);
- } else {
- handleTaskDeploymentFailure(executionVertexId, throwable);
- }
- return null;
- };
- }
-
- private void deployTaskSafe(final ExecutionVertexID executionVertexId) {
- try {
- final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
- executionVertexOperations.deploy(executionVertex);
- } catch (Throwable e) {
- handleTaskDeploymentFailure(executionVertexId, e);
- }
- }
-
private void notifyCoordinatorOfCancellation(ExecutionVertex vertex) {
// this method makes a best effort to filter out duplicate notifications, meaning cases
// where
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index fda0d8a7928..de7d56315bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -47,6 +47,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import static org.apache.flink.runtime.scheduler.DefaultSchedulerComponents.createSchedulerComponents;
+import static org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore;
/** Factory for {@link DefaultScheduler}. */
public class DefaultSchedulerFactory implements SchedulerNGFactory {
@@ -130,7 +131,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
schedulerComponents.getSchedulingStrategyFactory(),
FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
restartBackoffTimeStrategy,
- new DefaultExecutionVertexOperations(),
+ new DefaultExecutionOperations(),
new ExecutionVertexVersioner(),
schedulerComponents.getAllocatorFactory(),
initializationTimestamp,
@@ -145,7 +146,9 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
},
executionGraphFactory,
shuffleMaster,
- rpcTimeout);
+ rpcTimeout,
+ computeVertexParallelismStore(jobGraph),
+ new DefaultExecutionDeployer.Factory());
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentHandle.java
deleted file mode 100644
index 4724c37bd6d..00000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentHandle.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.runtime.scheduler;
-
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * This class is a tuple holding the information necessary to deploy an {@link ExecutionVertex}.
- *
- * <p>The tuple consists of:
- *
- * <ul>
- * <li>{@link ExecutionVertexVersion}
- * <li>{@link SlotExecutionVertexAssignment}
- * </ul>
- */
-class DeploymentHandle {
-
- private final ExecutionVertexVersion requiredVertexVersion;
-
- private final SlotExecutionVertexAssignment slotExecutionVertexAssignment;
-
- public DeploymentHandle(
- final ExecutionVertexVersion requiredVertexVersion,
- final SlotExecutionVertexAssignment slotExecutionVertexAssignment) {
-
- this.requiredVertexVersion = Preconditions.checkNotNull(requiredVertexVersion);
- this.slotExecutionVertexAssignment =
- Preconditions.checkNotNull(slotExecutionVertexAssignment);
- }
-
- public ExecutionVertexID getExecutionVertexId() {
- return requiredVertexVersion.getExecutionVertexId();
- }
-
- public ExecutionVertexVersion getRequiredVertexVersion() {
- return requiredVertexVersion;
- }
-
- public SlotExecutionVertexAssignment getSlotExecutionVertexAssignment() {
- return slotExecutionVertexAssignment;
- }
-
- public Optional<LogicalSlot> getLogicalSlot() {
- final CompletableFuture<LogicalSlot> logicalSlotFuture =
- slotExecutionVertexAssignment.getLogicalSlotFuture();
- Preconditions.checkState(
- logicalSlotFuture.isDone(), "method can only be called after slot future is done");
-
- if (logicalSlotFuture.isCompletedExceptionally()) {
- return Optional.empty();
- }
- return Optional.ofNullable(logicalSlotFuture.getNow(null));
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java
new file mode 100644
index 00000000000..4fc979e80ba
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/** This deployer is responsible for deploying executions. */
+interface ExecutionDeployer {
+
+ /**
+ * Allocate slots and deploy executions.
+ *
+ * @param executionsToDeploy executions to deploy
+ * @param requiredVersionByVertex required versions of the execution vertices. If the actual
+ * version does not match, the deployment of the execution will be rejected.
+ */
+ void allocateSlotsAndDeploy(
+ final List<Execution> executionsToDeploy,
+ final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex);
+
+ /** Factory to instantiate the {@link ExecutionDeployer}. */
+ interface Factory {
+
+ /**
+ * Instantiate an {@link ExecutionDeployer} with the given params. Note that the version of
+ * an execution vertex will be recorded before scheduling executions for it. The version may
+ * change if a global failure happens, or if the job is canceled, or if the execution vertex
+ * is restarted when all its current execution are FAILED/CANCELED. Once the version is
+ * changed, the previously triggered execution deployment will be skipped.
+ *
+ * @param log the logger
+ * @param executionSlotAllocator the allocator to allocate slots
+ * @param executionOperations the operations of executions
+ * @param executionVertexVersioner the versioner which records the versions of execution
+ * vertices.
+ * @param partitionRegistrationTimeout timeout of partition registration
+ * @param executionRetriever retriever to get executions
+ * @param allocationReservationFunc function to reserve allocations for local recovery
+ * @param mainThreadExecutor the main thread executor
+ * @return an instantiated {@link ExecutionDeployer}
+ */
+ ExecutionDeployer createInstance(
+ final Logger log,
+ final ExecutionSlotAllocator executionSlotAllocator,
+ final ExecutionOperations executionOperations,
+ final ExecutionVertexVersioner executionVertexVersioner,
+ final Time partitionRegistrationTimeout,
+ final Function<ExecutionAttemptID, Execution> executionRetriever,
+ final BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc,
+ final ComponentMainThreadExecutor mainThreadExecutor);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionOperations.java
similarity index 54%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionOperations.java
index 11cd1c9b378..6f8581dc7df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionOperations.java
@@ -20,16 +20,34 @@
package org.apache.flink.runtime.scheduler;
import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.Execution;
import java.util.concurrent.CompletableFuture;
-/** Operations on the {@link ExecutionVertex}. */
-public interface ExecutionVertexOperations {
+/** Operations on the {@link Execution}. */
+public interface ExecutionOperations {
- void deploy(ExecutionVertex executionVertex) throws JobException;
+ /**
+ * Deploy the execution.
+ *
+ * @param execution to deploy.
+ * @throws JobException if the execution cannot be deployed to the assigned resource
+ */
+ void deploy(Execution execution) throws JobException;
- CompletableFuture<?> cancel(ExecutionVertex executionVertex);
+ /**
+ * Cancel the execution.
+ *
+ * @param execution to cancel
+ * @return Future which completes when the cancellation is done
+ */
+ CompletableFuture<?> cancel(Execution execution);
- void markFailed(ExecutionVertex executionVertex, Throwable cause);
+ /**
+ * Mark the execution as FAILED.
+ *
+ * @param execution to mark as failed.
+ * @param cause of the execution failure
+ */
+ void markFailed(Execution execution, Throwable cause);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocator.java
index 5c4c9e3ccbf..526ebae99a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocator.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.scheduler;
import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import java.util.List;
@@ -29,15 +29,16 @@ public interface ExecutionSlotAllocator {
/**
* Allocate slots for the given executions.
*
- * @param executionVertexIds Execution vertices to allocate slots for
+ * @param executionAttemptIds executions to allocate slots for
+ * @return List of slot assignments to the executions
*/
- List<SlotExecutionVertexAssignment> allocateSlotsFor(
- List<ExecutionVertexID> executionVertexIds);
+ List<ExecutionSlotAssignment> allocateSlotsFor(List<ExecutionAttemptID> executionAttemptIds);
/**
- * Cancel an ongoing slot request.
+ * Cancel the ongoing slot request of the given {@link Execution}.
*
- * @param executionVertexId identifying which slot request should be canceled.
+ * @param executionAttemptId identifying the {@link Execution} of which the slot request should
+ * be canceled.
*/
- void cancel(ExecutionVertexID executionVertexId);
+ void cancel(ExecutionAttemptID executionAttemptId);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotExecutionVertexAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAssignment.java
similarity index 69%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotExecutionVertexAssignment.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAssignment.java
index d442a41a6de..435724bf8f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotExecutionVertexAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAssignment.java
@@ -18,29 +18,30 @@
package org.apache.flink.runtime.scheduler;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkNotNull;
-/** The slot assignment for a {@link ExecutionVertex}. */
-class SlotExecutionVertexAssignment {
+/** The slot assignment for an {@link Execution}. */
+class ExecutionSlotAssignment {
- private final ExecutionVertexID executionVertexId;
+ private final ExecutionAttemptID executionAttemptId;
private final CompletableFuture<LogicalSlot> logicalSlotFuture;
- SlotExecutionVertexAssignment(
- ExecutionVertexID executionVertexId, CompletableFuture<LogicalSlot> logicalSlotFuture) {
- this.executionVertexId = checkNotNull(executionVertexId);
+ ExecutionSlotAssignment(
+ ExecutionAttemptID executionAttemptId,
+ CompletableFuture<LogicalSlot> logicalSlotFuture) {
+ this.executionAttemptId = checkNotNull(executionAttemptId);
this.logicalSlotFuture = checkNotNull(logicalSlotFuture);
}
- ExecutionVertexID getExecutionVertexId() {
- return executionVertexId;
+ ExecutionAttemptID getExecutionAttemptId() {
+ return executionAttemptId;
}
CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
index 24f0444df7d..656ce2a6fac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
@@ -223,7 +223,7 @@ class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
}
} else {
LOG.debug(
- "No SlotExecutionVertexAssignment for logical {} from physical {}}",
+ "No request for logical {} from physical {}}",
logicalSlotRequestId,
physicalSlotRequestId);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
index 0b36a4aa35d..67c8b3bb970 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
@@ -45,6 +47,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
/**
* Allocates {@link LogicalSlot}s from physical shared slots.
@@ -94,6 +97,34 @@ class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator {
this.sharedSlots = new IdentityHashMap<>();
}
+ @Override
+ public List<ExecutionSlotAssignment> allocateSlotsFor(
+ List<ExecutionAttemptID> executionAttemptIds) {
+
+ final Map<ExecutionVertexID, ExecutionAttemptID> vertexIdToExecutionId = new HashMap<>();
+ executionAttemptIds.forEach(
+ executionId ->
+ vertexIdToExecutionId.put(executionId.getExecutionVertexId(), executionId));
+
+ checkState(
+ vertexIdToExecutionId.size() == executionAttemptIds.size(),
+ "SlotSharingExecutionSlotAllocator does not support one execution vertex to have multiple concurrent executions");
+
+ final List<ExecutionVertexID> vertexIds =
+ executionAttemptIds.stream()
+ .map(ExecutionAttemptID::getExecutionVertexId)
+ .collect(Collectors.toList());
+
+ return allocateSlotsForVertices(vertexIds).stream()
+ .map(
+ vertexAssignment ->
+ new ExecutionSlotAssignment(
+ vertexIdToExecutionId.get(
+ vertexAssignment.getExecutionVertexId()),
+ vertexAssignment.getLogicalSlotFuture()))
+ .collect(Collectors.toList());
+ }
+
/**
* Creates logical {@link SlotExecutionVertexAssignment}s from physical shared slots.
*
@@ -117,8 +148,7 @@ class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator {
*
* @param executionVertexIds Execution vertices to allocate slots for
*/
- @Override
- public List<SlotExecutionVertexAssignment> allocateSlotsFor(
+ private List<SlotExecutionVertexAssignment> allocateSlotsForVertices(
List<ExecutionVertexID> executionVertexIds) {
SharedSlotProfileRetriever sharedSlotProfileRetriever =
@@ -149,8 +179,8 @@ class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator {
}
@Override
- public void cancel(ExecutionVertexID executionVertexId) {
- cancelLogicalSlotRequest(executionVertexId, null);
+ public void cancel(ExecutionAttemptID executionAttemptId) {
+ cancelLogicalSlotRequest(executionAttemptId.getExecutionVertexId(), null);
}
private void cancelLogicalSlotRequest(ExecutionVertexID executionVertexId, Throwable cause) {
@@ -284,4 +314,27 @@ class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator {
});
}
}
+
+ /** The slot assignment for an {@link ExecutionVertex}. */
+ private static class SlotExecutionVertexAssignment {
+
+ private final ExecutionVertexID executionVertexId;
+
+ private final CompletableFuture<LogicalSlot> logicalSlotFuture;
+
+ SlotExecutionVertexAssignment(
+ ExecutionVertexID executionVertexId,
+ CompletableFuture<LogicalSlot> logicalSlotFuture) {
+ this.executionVertexId = checkNotNull(executionVertexId);
+ this.logicalSlotFuture = checkNotNull(logicalSlotFuture);
+ }
+
+ ExecutionVertexID getExecutionVertexId() {
+ return executionVertexId;
+ }
+
+ CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
+ return logicalSlotFuture;
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index 2c99ecc9c19..6507774b3e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -41,10 +41,11 @@ import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalVertex;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.DefaultExecutionDeployer;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionOperations;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
-import org.apache.flink.runtime.scheduler.ExecutionVertexOperations;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
@@ -93,7 +94,7 @@ public class AdaptiveBatchScheduler extends DefaultScheduler implements Schedule
final SchedulingStrategyFactory schedulingStrategyFactory,
final FailoverStrategy.Factory failoverStrategyFactory,
final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
- final ExecutionVertexOperations executionVertexOperations,
+ final ExecutionOperations executionOperations,
final ExecutionVertexVersioner executionVertexVersioner,
final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
long initializationTimestamp,
@@ -120,7 +121,7 @@ public class AdaptiveBatchScheduler extends DefaultScheduler implements Schedule
schedulingStrategyFactory,
failoverStrategyFactory,
restartBackoffTimeStrategy,
- executionVertexOperations,
+ executionOperations,
executionVertexVersioner,
executionSlotAllocatorFactory,
initializationTimestamp,
@@ -130,7 +131,8 @@ public class AdaptiveBatchScheduler extends DefaultScheduler implements Schedule
shuffleMaster,
rpcTimeout,
computeVertexParallelismStoreForDynamicGraph(
- jobGraph.getVertices(), defaultMaxParallelism));
+ jobGraph.getVertices(), defaultMaxParallelism),
+ new DefaultExecutionDeployer.Factory());
this.logicalTopology = DefaultLogicalTopology.fromJobGraph(jobGraph);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
index 390273534a3..0cc501203b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
@@ -48,7 +48,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
-import org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations;
+import org.apache.flink.runtime.scheduler.DefaultExecutionOperations;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
@@ -161,7 +161,7 @@ public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory {
new VertexwiseSchedulingStrategy.Factory(),
FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
restartBackoffTimeStrategy,
- new DefaultExecutionVertexOperations(),
+ new DefaultExecutionOperations(),
new ExecutionVertexVersioner(),
allocatorFactory,
initializationTimestamp,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java
new file mode 100644
index 00000000000..96c89093b92
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.TestingShuffleMaster;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.IterableUtils;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link DefaultExecutionDeployer}. */
+@ExtendWith(TestLoggerExtension.class)
+class DefaultExecutionDeployerTest {
+
+ private ScheduledExecutorService executor;
+ private ComponentMainThreadExecutor mainThreadExecutor;
+ private TestExecutionOperationsDecorator testExecutionOperations;
+ private ExecutionVertexVersioner executionVertexVersioner;
+ private TestExecutionSlotAllocator testExecutionSlotAllocator;
+ private TestingShuffleMaster shuffleMaster;
+ private TestingJobMasterPartitionTracker partitionTracker;
+ private Time partitionRegistrationTimeout;
+
+ @BeforeEach
+ void setUp() {
+ executor = Executors.newSingleThreadScheduledExecutor();
+ mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
+ testExecutionOperations =
+ new TestExecutionOperationsDecorator(
+ new ExecutionOperations() {
+ @Override
+ public void deploy(Execution execution) {}
+
+ @Override
+ public CompletableFuture<?> cancel(Execution execution) {
+ return null;
+ }
+
+ @Override
+ public void markFailed(Execution execution, Throwable cause) {}
+ });
+ executionVertexVersioner = new ExecutionVertexVersioner();
+ testExecutionSlotAllocator = new TestExecutionSlotAllocator();
+ shuffleMaster = new TestingShuffleMaster();
+ partitionTracker = new TestingJobMasterPartitionTracker();
+ partitionRegistrationTimeout = Time.milliseconds(5000);
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (executor != null) {
+ ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, executor);
+ }
+ }
+
+ @Test
+ void testDeployTasks() throws Exception {
+ final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+ final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+ final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+
+ deployTasks(executionDeployer, executionGraph);
+
+ assertThat(testExecutionOperations.getDeployedExecutions())
+ .containsExactly(getAnyExecution(executionGraph).getAttemptId());
+ }
+
+ @Test
+ void testDeployTasksOnlyIfAllSlotRequestsAreFulfilled() throws Exception {
+ final JobGraph jobGraph = singleJobVertexJobGraph(4);
+ final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+ final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+
+ testExecutionSlotAllocator.disableAutoCompletePendingRequests();
+
+ deployTasks(executionDeployer, executionGraph);
+
+ assertThat(testExecutionOperations.getDeployedExecutions()).isEmpty();
+
+ final ExecutionAttemptID attemptId = getAnyExecution(executionGraph).getAttemptId();
+ testExecutionSlotAllocator.completePendingRequest(attemptId);
+ assertThat(testExecutionOperations.getDeployedExecutions()).isEmpty();
+
+ testExecutionSlotAllocator.completePendingRequests();
+ assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(4);
+ }
+
+ @Test
+ void testDeploymentFailures() throws Exception {
+ final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+
+ testExecutionOperations.enableFailDeploy();
+
+ final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+ final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+ deployTasks(executionDeployer, executionGraph);
+
+ assertThat(testExecutionOperations.getFailedExecutions())
+ .containsExactly(getAnyExecution(executionGraph).getAttemptId());
+ }
+
+ @Test
+ void testSlotAllocationTimeout() throws Exception {
+ final JobGraph jobGraph = singleJobVertexJobGraph(2);
+
+ testExecutionSlotAllocator.disableAutoCompletePendingRequests();
+
+ final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+ final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+ deployTasks(executionDeployer, executionGraph);
+
+ assertThat(testExecutionSlotAllocator.getPendingRequests()).hasSize(2);
+
+ final ExecutionAttemptID attemptId = getAnyExecution(executionGraph).getAttemptId();
+ testExecutionSlotAllocator.timeoutPendingRequest(attemptId);
+
+ assertThat(testExecutionOperations.getFailedExecutions()).containsExactly(attemptId);
+ }
+
+ @Test
+ void testSkipDeploymentIfVertexVersionOutdated() throws Exception {
+ final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+
+ testExecutionSlotAllocator.disableAutoCompletePendingRequests();
+
+ final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+ final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+ deployTasks(executionDeployer, executionGraph);
+
+ final ExecutionAttemptID attemptId = getAnyExecution(executionGraph).getAttemptId();
+
+ executionVertexVersioner.recordModification(attemptId.getExecutionVertexId());
+ testExecutionSlotAllocator.completePendingRequests();
+
+ assertThat(testExecutionOperations.getDeployedVertices()).isEmpty();
+ }
+
+ @Test
+ void testReleaseSlotIfVertexVersionOutdated() throws Exception {
+ final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+
+ testExecutionSlotAllocator.disableAutoCompletePendingRequests();
+
+ final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+ final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+ deployTasks(executionDeployer, executionGraph);
+
+ final ExecutionAttemptID attemptId = getAnyExecution(executionGraph).getAttemptId();
+
+ executionVertexVersioner.recordModification(attemptId.getExecutionVertexId());
+ testExecutionSlotAllocator.completePendingRequests();
+
+ assertThat(testExecutionSlotAllocator.getReturnedSlots()).hasSize(1);
+ }
+
+ @Test
+ void testDeployOnlyIfVertexIsCreated() throws Exception {
+ final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+ final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+ final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+
+ // deploy once to transition the tasks out from CREATED state
+ deployTasks(executionDeployer, executionGraph);
+
+ // The deploying of a non-CREATED vertex will result in IllegalStateException
+ assertThatThrownBy(() -> deployTasks(executionDeployer, executionGraph))
+ .as("IllegalStateException should happen")
+ .isInstanceOf(IllegalStateException.class);
+ }
+
+ @Test
+ void testDeploymentWaitForProducedPartitionRegistration() throws Exception {
+ shuffleMaster.setAutoCompleteRegistration(false);
+
+ final List<ResultPartitionID> trackedPartitions = new ArrayList<>();
+ partitionTracker.setStartTrackingPartitionsConsumer(
+ (resourceID, resultPartitionDeploymentDescriptor) ->
+ trackedPartitions.add(
+ resultPartitionDeploymentDescriptor
+ .getShuffleDescriptor()
+ .getResultPartitionID()));
+
+ final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
+ final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+ final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+
+ deployTasks(executionDeployer, executionGraph);
+
+ assertThat(trackedPartitions).isEmpty();
+ assertThat(testExecutionOperations.getDeployedExecutions()).isEmpty();
+
+ shuffleMaster.completeAllPendingRegistrations();
+ assertThat(trackedPartitions).hasSize(1);
+ assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2);
+ }
+
+ @Test
+ void testFailedProducedPartitionRegistration() throws Exception {
+ shuffleMaster.setAutoCompleteRegistration(false);
+
+ final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
+ final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+ final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+
+ deployTasks(executionDeployer, executionGraph);
+
+ assertThat(testExecutionOperations.getFailedExecutions()).isEmpty();
+
+ shuffleMaster.failAllPendingRegistrations();
+ assertThat(testExecutionOperations.getFailedExecutions()).hasSize(1);
+ }
+
+ @Test
+ void testDirectExceptionOnProducedPartitionRegistration() throws Exception {
+ shuffleMaster.setThrowExceptionalOnRegistration(true);
+
+ final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
+ final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+ final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+
+ deployTasks(executionDeployer, executionGraph);
+
+ assertThat(testExecutionOperations.getFailedExecutions()).hasSize(1);
+ }
+
+ @Test
+ void testProducedPartitionRegistrationTimeout() throws Exception {
+ ScheduledExecutorService scheduledExecutorService = null;
+ try {
+ partitionRegistrationTimeout = Time.milliseconds(1);
+
+ scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ mainThreadExecutor =
+ ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+ scheduledExecutorService);
+
+ shuffleMaster.setAutoCompleteRegistration(false);
+
+ final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
+ final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+ final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+
+ deployTasks(executionDeployer, executionGraph);
+
+ testExecutionOperations.awaitFailedExecutions(1);
+ } finally {
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdown();
+ }
+ }
+ }
+
+ private static JobGraph singleNonParallelJobVertexJobGraph() {
+ return singleJobVertexJobGraph(1);
+ }
+
+ private static JobGraph singleJobVertexJobGraph(final int parallelism) {
+ final JobVertex vertex = new JobVertex("source");
+ vertex.setInvokableClass(NoOpInvokable.class);
+ vertex.setParallelism(parallelism);
+ return JobGraphTestUtils.streamingJobGraph(vertex);
+ }
+
+ private static JobGraph nonParallelSourceSinkJobGraph() {
+ final JobVertex source = new JobVertex("source");
+ source.setInvokableClass(NoOpInvokable.class);
+
+ final JobVertex sink = new JobVertex("sink");
+ sink.setInvokableClass(NoOpInvokable.class);
+
+ sink.connectNewDataSetAsInput(
+ source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+ return JobGraphTestUtils.streamingJobGraph(source, sink);
+ }
+
+ private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
+ final ExecutionGraph executionGraph =
+ TestingDefaultExecutionGraphBuilder.newBuilder()
+ .setJobGraph(jobGraph)
+ .setShuffleMaster(shuffleMaster)
+ .setPartitionTracker(partitionTracker)
+ .build(executor);
+
+ executionGraph.setInternalTaskFailuresListener(
+ new InternalFailuresListener() {
+ @Override
+ public void notifyTaskFailure(
+ ExecutionAttemptID attemptId,
+ Throwable t,
+ boolean cancelTask,
+ boolean releasePartitions) {}
+
+ @Override
+ public void notifyGlobalFailure(Throwable t) {}
+ });
+ executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+ return executionGraph;
+ }
+
+ private ExecutionDeployer createExecutionDeployer(ExecutionGraph executionGraph) {
+ return new DefaultExecutionDeployer.Factory()
+ .createInstance(
+ LoggerFactory.getLogger(DefaultExecutionDeployer.class),
+ testExecutionSlotAllocator,
+ testExecutionOperations,
+ executionVertexVersioner,
+ partitionRegistrationTimeout,
+ id -> executionGraph.getRegisteredExecutions().get(id),
+ (ignored1, ignored2) -> {},
+ mainThreadExecutor);
+ }
+
+ private void deployTasks(ExecutionDeployer executionDeployer, ExecutionGraph executionGraph) {
+ deployTasks(
+ executionDeployer,
+ IterableUtils.toStream(executionGraph.getAllExecutionVertices())
+ .map(ExecutionVertex::getCurrentExecutionAttempt)
+ .collect(Collectors.toList()));
+ }
+
+ private void deployTasks(ExecutionDeployer executionDeployer, List<Execution> executions) {
+ final Set<ExecutionVertexID> executionVertexIds =
+ executions.stream()
+ .map(Execution::getAttemptId)
+ .map(ExecutionAttemptID::getExecutionVertexId)
+ .collect(Collectors.toSet());
+
+ executionDeployer.allocateSlotsAndDeploy(
+ executions, executionVertexVersioner.recordVertexModifications(executionVertexIds));
+ }
+
+ private static Execution getAnyExecution(ExecutionGraph executionGraph) {
+ return executionGraph.getRegisteredExecutions().values().iterator().next();
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index cea1d98f880..733dbbc3d6e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -170,7 +170,7 @@ public class DefaultSchedulerTest extends TestLogger {
private TestRestartBackoffTimeStrategy testRestartBackoffTimeStrategy;
- private TestExecutionVertexOperationsDecorator testExecutionVertexOperations;
+ private TestExecutionOperationsDecorator testExecutionOperations;
private ExecutionVertexVersioner executionVertexVersioner;
@@ -193,8 +193,8 @@ public class DefaultSchedulerTest extends TestLogger {
testRestartBackoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 0);
- testExecutionVertexOperations =
- new TestExecutionVertexOperationsDecorator(new DefaultExecutionVertexOperations());
+ testExecutionOperations =
+ new TestExecutionOperationsDecorator(new DefaultExecutionOperations());
executionVertexVersioner = new ExecutionVertexVersioner();
@@ -227,7 +227,7 @@ public class DefaultSchedulerTest extends TestLogger {
createSchedulerAndStartScheduling(jobGraph);
final List<ExecutionVertexID> deployedExecutionVertices =
- testExecutionVertexOperations.getDeployedVertices();
+ testExecutionOperations.getDeployedVertices();
final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
assertThat(deployedExecutionVertices, contains(executionVertexId));
@@ -281,13 +281,13 @@ public class DefaultSchedulerTest extends TestLogger {
new ExecutionVertexID(onlyJobVertexId, 3));
schedulingStrategy.schedule(verticesToSchedule);
- assertThat(testExecutionVertexOperations.getDeployedVertices(), hasSize(0));
+ assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
testExecutionSlotAllocator.completePendingRequest(verticesToSchedule.get(0));
- assertThat(testExecutionVertexOperations.getDeployedVertices(), hasSize(0));
+ assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
testExecutionSlotAllocator.completePendingRequests();
- assertThat(testExecutionVertexOperations.getDeployedVertices(), hasSize(4));
+ assertThat(testExecutionOperations.getDeployedVertices(), hasSize(4));
}
@Test
@@ -315,7 +315,7 @@ public class DefaultSchedulerTest extends TestLogger {
schedulingStrategy.schedule(desiredScheduleOrder);
final List<ExecutionVertexID> deployedExecutionVertices =
- testExecutionVertexOperations.getDeployedVertices();
+ testExecutionOperations.getDeployedVertices();
assertEquals(desiredScheduleOrder, deployedExecutionVertices);
}
@@ -325,15 +325,15 @@ public class DefaultSchedulerTest extends TestLogger {
final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
- testExecutionVertexOperations.enableFailDeploy();
+ testExecutionOperations.enableFailDeploy();
createSchedulerAndStartScheduling(jobGraph);
- testExecutionVertexOperations.disableFailDeploy();
+ testExecutionOperations.disableFailDeploy();
taskRestartExecutor.triggerScheduledTasks();
final List<ExecutionVertexID> deployedExecutionVertices =
- testExecutionVertexOperations.getDeployedVertices();
+ testExecutionOperations.getDeployedVertices();
final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId));
@@ -360,7 +360,7 @@ public class DefaultSchedulerTest extends TestLogger {
taskRestartExecutor.triggerScheduledTasks();
final List<ExecutionVertexID> deployedExecutionVertices =
- testExecutionVertexOperations.getDeployedVertices();
+ testExecutionOperations.getDeployedVertices();
final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId));
}
@@ -537,7 +537,7 @@ public class DefaultSchedulerTest extends TestLogger {
taskRestartExecutor.triggerScheduledTasks();
assertThat(
- testExecutionVertexOperations.getDeployedVertices(),
+ testExecutionOperations.getDeployedVertices(),
containsInAnyOrder(sourceExecutionVertexId, sinkExecutionVertexId));
assertThat(
scheduler.requestJob().getArchivedExecutionGraph().getState(),
@@ -652,7 +652,7 @@ public class DefaultSchedulerTest extends TestLogger {
taskRestartExecutor.triggerScheduledTasks();
final List<ExecutionVertexID> deployedExecutionVertices =
- testExecutionVertexOperations.getDeployedVertices();
+ testExecutionOperations.getDeployedVertices();
final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId));
}
@@ -704,7 +704,7 @@ public class DefaultSchedulerTest extends TestLogger {
new ExecutionVertexID(onlyJobVertex.getID(), 1);
assertThat(
"The execution vertices should be deployed in a specific order reflecting the scheduling start and the global fail-over afterwards.",
- testExecutionVertexOperations.getDeployedVertices(),
+ testExecutionOperations.getDeployedVertices(),
contains(
executionVertexId0,
executionVertexId1,
@@ -881,16 +881,17 @@ public class DefaultSchedulerTest extends TestLogger {
scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
taskRestartExecutor.triggerScheduledTasks();
- final List<ExecutionVertexID> deployedExecutionVertices =
- testExecutionVertexOperations.getDeployedVertices();
// the first task failover should be skipped on state restore failure
+ List<ExecutionVertexID> deployedExecutionVertices =
+ testExecutionOperations.getDeployedVertices();
final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
assertThat(deployedExecutionVertices, contains(executionVertexId));
// a global failure should be triggered on state restore failure
masterHook.disableFailOnRestore();
taskRestartExecutor.triggerScheduledTasks();
+ deployedExecutionVertices = testExecutionOperations.getDeployedVertices();
assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId));
}
@@ -1099,7 +1100,7 @@ public class DefaultSchedulerTest extends TestLogger {
final Set<CompletableFuture<LogicalSlot>> pendingLogicalSlotFutures =
testExecutionSlotAllocator.getPendingRequests().values().stream()
- .map(SlotExecutionVertexAssignment::getLogicalSlotFuture)
+ .map(ExecutionSlotAssignment::getLogicalSlotFuture)
.collect(Collectors.toSet());
assertThat(pendingLogicalSlotFutures, hasSize(parallelism * 2));
@@ -1495,11 +1496,11 @@ public class DefaultSchedulerTest extends TestLogger {
createSchedulerAndStartScheduling(jobGraph);
assertThat(trackedPartitions, hasSize(0));
- assertThat(testExecutionVertexOperations.getDeployedVertices(), hasSize(0));
+ assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
shuffleMaster.completeAllPendingRegistrations();
assertThat(trackedPartitions, hasSize(1));
- assertThat(testExecutionVertexOperations.getDeployedVertices(), hasSize(2));
+ assertThat(testExecutionOperations.getDeployedVertices(), hasSize(2));
}
@Test
@@ -1510,12 +1511,12 @@ public class DefaultSchedulerTest extends TestLogger {
createSchedulerAndStartScheduling(jobGraph);
- assertThat(testExecutionVertexOperations.getCanceledVertices(), hasSize(0));
- assertThat(testExecutionVertexOperations.getFailedVertices(), hasSize(0));
+ assertThat(testExecutionOperations.getCanceledVertices(), hasSize(0));
+ assertThat(testExecutionOperations.getFailedVertices(), hasSize(0));
shuffleMaster.failAllPendingRegistrations();
- assertThat(testExecutionVertexOperations.getCanceledVertices(), hasSize(2));
- assertThat(testExecutionVertexOperations.getFailedVertices(), hasSize(1));
+ assertThat(testExecutionOperations.getCanceledVertices(), hasSize(2));
+ assertThat(testExecutionOperations.getFailedVertices(), hasSize(1));
}
@Test
@@ -1526,8 +1527,8 @@ public class DefaultSchedulerTest extends TestLogger {
createSchedulerAndStartScheduling(jobGraph);
- assertThat(testExecutionVertexOperations.getCanceledVertices(), hasSize(2));
- assertThat(testExecutionVertexOperations.getFailedVertices(), hasSize(1));
+ assertThat(testExecutionOperations.getCanceledVertices(), hasSize(2));
+ assertThat(testExecutionOperations.getFailedVertices(), hasSize(1));
}
@Test
@@ -1546,8 +1547,8 @@ public class DefaultSchedulerTest extends TestLogger {
timeout = Time.milliseconds(1);
createSchedulerAndStartScheduling(jobGraph, mainThreadExecutor);
- testExecutionVertexOperations.awaitCanceledVertices(2);
- testExecutionVertexOperations.awaitFailedVertices(1);
+ testExecutionOperations.awaitCanceledExecutions(2);
+ testExecutionOperations.awaitFailedExecutions(1);
} finally {
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdown();
@@ -1857,7 +1858,7 @@ public class DefaultSchedulerTest extends TestLogger {
.setSchedulingStrategyFactory(new PipelinedRegionSchedulingStrategy.Factory())
.setFailoverStrategyFactory(new RestartPipelinedRegionFailoverStrategy.Factory())
.setRestartBackoffTimeStrategy(testRestartBackoffTimeStrategy)
- .setExecutionVertexOperations(testExecutionVertexOperations)
+ .setExecutionOperations(testExecutionOperations)
.setExecutionVertexVersioner(executionVertexVersioner)
.setExecutionSlotAllocatorFactory(executionSlotAllocatorFactory)
.setShuffleMaster(shuffleMaster)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DeploymentHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DeploymentHandleTest.java
deleted file mode 100644
index 3ad93bbfea3..00000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DeploymentHandleTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.runtime.scheduler;
-
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.concurrent.CompletableFuture;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/** Unit tests for {@link DeploymentHandle}. */
-public class DeploymentHandleTest {
-
- private static final JobVertexID TEST_JOB_VERTEX_ID = new JobVertexID(0, 0);
-
- private static final ExecutionVertexID TEST_EXECUTION_VERTEX_ID =
- new ExecutionVertexID(TEST_JOB_VERTEX_ID, 0);
-
- private static final ExecutionVertexVersion TEST_EXECUTION_VERTEX_VERSION =
- new ExecutionVertexVersion(TEST_EXECUTION_VERTEX_ID, 0);
-
- private CompletableFuture<LogicalSlot> logicalSlotFuture;
-
- private DeploymentHandle deploymentHandle;
-
- @Before
- public void setUp() {
- logicalSlotFuture = new CompletableFuture<>();
- final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
- new SlotExecutionVertexAssignment(TEST_EXECUTION_VERTEX_ID, logicalSlotFuture);
- deploymentHandle =
- new DeploymentHandle(TEST_EXECUTION_VERTEX_VERSION, slotExecutionVertexAssignment);
- }
-
- @Test
- public void getLogicalSlotThrowsExceptionIfSlotFutureNotCompleted() {
- try {
- assertFalse(deploymentHandle.getLogicalSlot().isPresent());
- fail();
- } catch (IllegalStateException e) {
- assertThat(
- e.getMessage(),
- containsString("method can only be called after slot future is done"));
- }
- }
-
- @Test
- public void slotIsNotPresentIfFutureWasCancelled() {
- logicalSlotFuture.cancel(false);
- assertFalse(deploymentHandle.getLogicalSlot().isPresent());
- }
-
- @Test
- public void slotIsNotPresentIfFutureWasCompletedExceptionally() {
- logicalSlotFuture.completeExceptionally(new RuntimeException("expected"));
- assertFalse(deploymentHandle.getLogicalSlot().isPresent());
- }
-
- @Test
- public void getLogicalSlotReturnsSlotIfFutureCompletedNormally() {
- final LogicalSlot logicalSlot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
- logicalSlotFuture.complete(logicalSlot);
- assertTrue(deploymentHandle.getLogicalSlot().isPresent());
- assertSame(logicalSlot, deploymentHandle.getLogicalSlot().get());
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 464d5c161db..ba6b7a0d880 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -85,6 +85,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import static org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -419,13 +420,14 @@ public class SchedulerTestingUtils {
new RestartPipelinedRegionFailoverStrategy.Factory();
protected RestartBackoffTimeStrategy restartBackoffTimeStrategy =
NoRestartBackoffTimeStrategy.INSTANCE;
- protected ExecutionVertexOperations executionVertexOperations =
- new DefaultExecutionVertexOperations();
+ protected ExecutionOperations executionOperations = new DefaultExecutionOperations();
protected ExecutionVertexVersioner executionVertexVersioner =
new ExecutionVertexVersioner();
protected ExecutionSlotAllocatorFactory executionSlotAllocatorFactory =
new TestExecutionSlotAllocatorFactory();
protected JobStatusListener jobStatusListener = (ignoredA, ignoredB, ignoredC) -> {};
+ protected ExecutionDeployer.Factory executionDeployerFactory =
+ new DefaultExecutionDeployer.Factory();
public DefaultSchedulerBuilder(
final JobGraph jobGraph,
@@ -541,9 +543,9 @@ public class SchedulerTestingUtils {
return this;
}
- public DefaultSchedulerBuilder setExecutionVertexOperations(
- final ExecutionVertexOperations executionVertexOperations) {
- this.executionVertexOperations = executionVertexOperations;
+ public DefaultSchedulerBuilder setExecutionOperations(
+ final ExecutionOperations executionOperations) {
+ this.executionOperations = executionOperations;
return this;
}
@@ -564,6 +566,12 @@ public class SchedulerTestingUtils {
return this;
}
+ public DefaultSchedulerBuilder setExecutionDeployerFactory(
+ ExecutionDeployer.Factory executionDeployerFactory) {
+ this.executionDeployerFactory = executionDeployerFactory;
+ return this;
+ }
+
public DefaultScheduler build() throws Exception {
final ExecutionGraphFactory executionGraphFactory =
new DefaultExecutionGraphFactory(
@@ -592,7 +600,7 @@ public class SchedulerTestingUtils {
schedulingStrategyFactory,
failoverStrategyFactory,
restartBackoffTimeStrategy,
- executionVertexOperations,
+ executionOperations,
executionVertexVersioner,
executionSlotAllocatorFactory,
System.currentTimeMillis(),
@@ -600,7 +608,9 @@ public class SchedulerTestingUtils {
jobStatusListener,
executionGraphFactory,
shuffleMaster,
- rpcTimeout);
+ rpcTimeout,
+ computeVertexParallelismStore(jobGraph),
+ executionDeployerFactory);
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
index cee964ae179..aeac6d7cc11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfileTestingUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingPayload;
@@ -53,6 +54,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createRandomExecutionVertexId;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -113,9 +115,9 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
AllocationContext context =
AllocationContext.newBuilder().addGroup(EV1, EV2).addGroup(EV3, EV4).build();
- List<SlotExecutionVertexAssignment> executionVertexAssignments =
+ List<ExecutionSlotAssignment> executionSlotAssignments =
context.allocateSlotsFor(EV1, EV2, EV3, EV4);
- Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments);
+ Collection<ExecutionVertexID> assignIds = getAssignIds(executionSlotAssignments);
assertThat(assignIds, containsInAnyOrder(EV1, EV2, EV3, EV4));
assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2));
@@ -126,9 +128,8 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
AllocationContext context = AllocationContext.newBuilder().addGroup(EV1, EV2).build();
context.allocateSlotsFor(EV1);
- List<SlotExecutionVertexAssignment> executionVertexAssignments =
- context.allocateSlotsFor(EV2);
- Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments);
+ List<ExecutionSlotAssignment> executionSlotAssignments = context.allocateSlotsFor(EV2);
+ Collection<ExecutionVertexID> assignIds = getAssignIds(executionSlotAssignments);
// execution 0 from the first allocateSlotsFor call and execution 1 from the second
// allocateSlotsFor call
@@ -142,8 +143,8 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
throws ExecutionException, InterruptedException {
AllocationContext context = AllocationContext.newBuilder().addGroup(EV1).build();
- SlotExecutionVertexAssignment assignment1 = context.allocateSlotsFor(EV1).get(0);
- SlotExecutionVertexAssignment assignment2 = context.allocateSlotsFor(EV1).get(0);
+ ExecutionSlotAssignment assignment1 = context.allocateSlotsFor(EV1).get(0);
+ ExecutionSlotAssignment assignment2 = context.allocateSlotsFor(EV1).get(0);
assertThat(
assignment1.getLogicalSlotFuture().get()
@@ -226,7 +227,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
true,
true,
(context, assignment) -> {
- context.getAllocator().cancel(assignment.getExecutionVertexId());
+ context.getAllocator().cancel(assignment.getExecutionAttemptId());
try {
assignment.getLogicalSlotFuture().get();
fail("The logical future must finish with the cancellation exception");
@@ -245,7 +246,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
false,
false,
(context, assignment) -> {
- context.getAllocator().cancel(assignment.getExecutionVertexId());
+ context.getAllocator().cancel(assignment.getExecutionAttemptId());
assignment.getLogicalSlotFuture().get();
});
}
@@ -253,7 +254,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
private static void testLogicalSlotRequestCancellationOrRelease(
boolean completePhysicalSlotFutureManually,
boolean cancelsPhysicalSlotRequestAndRemovesSharedSlot,
- BiConsumerWithException<AllocationContext, SlotExecutionVertexAssignment, Exception>
+ BiConsumerWithException<AllocationContext, ExecutionSlotAssignment, Exception>
cancelOrReleaseAction)
throws Exception {
AllocationContext.Builder allocationContextBuilder =
@@ -264,19 +265,19 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
}
AllocationContext context = allocationContextBuilder.build();
- List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(EV1, EV2);
+ List<ExecutionSlotAssignment> assignments = context.allocateSlotsFor(EV1, EV2);
assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1));
// cancel or release only one sharing logical slots
cancelOrReleaseAction.accept(context, assignments.get(0));
- List<SlotExecutionVertexAssignment> assignmentsAfterOneCancellation =
+ List<ExecutionSlotAssignment> assignmentsAfterOneCancellation =
context.allocateSlotsFor(EV1, EV2);
// there should be no more physical slot allocations, as the first logical slot reuses the
// previous shared slot
assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1));
// cancel or release all sharing logical slots
- for (SlotExecutionVertexAssignment assignment : assignmentsAfterOneCancellation) {
+ for (ExecutionSlotAssignment assignment : assignmentsAfterOneCancellation) {
cancelOrReleaseAction.accept(context, assignment);
}
SlotRequestId slotRequestId =
@@ -298,7 +299,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
public void testPhysicalSlotReleaseLogicalSlots()
throws ExecutionException, InterruptedException {
AllocationContext context = AllocationContext.newBuilder().addGroup(EV1, EV2).build();
- List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(EV1, EV2);
+ List<ExecutionSlotAssignment> assignments = context.allocateSlotsFor(EV1, EV2);
List<TestingPayload> payloads =
assignments.stream()
.map(
@@ -376,10 +377,10 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
AllocationContext context = createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
// allocate 2 physical slots for 2 groups
- List<SlotExecutionVertexAssignment> assignments1 = context.allocateSlotsFor(EV1, EV3);
+ List<ExecutionSlotAssignment> assignments1 = context.allocateSlotsFor(EV1, EV3);
fulfilOneOfTwoSlotRequestsAndGetPendingProfile(context, new AllocationID());
PhysicalSlotRequestBulk bulk1 = bulkChecker.getBulk();
- List<SlotExecutionVertexAssignment> assignments2 = context.allocateSlotsFor(EV2);
+ List<ExecutionSlotAssignment> assignments2 = context.allocateSlotsFor(EV2);
// cancelling of (EV1, EV3) releases assignments1 and only one physical slot for EV3
// the second physical slot is held by sharing EV2 from the next bulk
@@ -438,10 +439,9 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
new FlinkException("test failure")))
.build();
- final List<SlotExecutionVertexAssignment> allocatedSlots =
- context.allocateSlotsFor(EV1, EV2);
+ final List<ExecutionSlotAssignment> allocatedSlots = context.allocateSlotsFor(EV1, EV2);
- for (SlotExecutionVertexAssignment allocatedSlot : allocatedSlots) {
+ for (ExecutionSlotAssignment allocatedSlot : allocatedSlots) {
assertTrue(allocatedSlot.getLogicalSlotFuture().isCompletedExceptionally());
}
@@ -473,9 +473,10 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
}
private static List<ExecutionVertexID> getAssignIds(
- Collection<SlotExecutionVertexAssignment> assignments) {
+ Collection<ExecutionSlotAssignment> assignments) {
return assignments.stream()
- .map(SlotExecutionVertexAssignment::getExecutionVertexId)
+ .map(ExecutionSlotAssignment::getExecutionAttemptId)
+ .map(ExecutionAttemptID::getExecutionVertexId)
.collect(Collectors.toList());
}
@@ -524,8 +525,13 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
return allocator;
}
- private List<SlotExecutionVertexAssignment> allocateSlotsFor(ExecutionVertexID... ids) {
- return allocator.allocateSlotsFor(Arrays.asList(ids));
+ private List<ExecutionSlotAssignment> allocateSlotsFor(ExecutionVertexID... ids) {
+ return allocator.allocateSlotsFor(
+ Arrays.stream(ids)
+ .map(
+ executionVertexId ->
+ createExecutionAttemptId(executionVertexId, 0))
+ .collect(Collectors.toList()));
}
private TestingSlotSharingStrategy getSlotSharingStrategy() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionOperationsDecorator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionOperationsDecorator.java
new file mode 100644
index 00000000000..cd4205bba15
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionOperationsDecorator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link ExecutionOperations} decorator that enables instrumentation of execution operations for
+ * testing purposes.
+ */
+public class TestExecutionOperationsDecorator implements ExecutionOperations {
+
+ private final ExecutionOperations delegate;
+
+ private final CountLatch deployedExecutions = new CountLatch();
+ private final CountLatch canceledExecutions = new CountLatch();
+ private final CountLatch failedExecutions = new CountLatch();
+
+ private boolean failDeploy;
+
+ public TestExecutionOperationsDecorator(final ExecutionOperations delegate) {
+ this.delegate = checkNotNull(delegate);
+ }
+
+ @Override
+ public void deploy(final Execution execution) throws JobException {
+ deployedExecutions.add(execution);
+ if (failDeploy) {
+ throw new RuntimeException("Expected");
+ }
+ delegate.deploy(execution);
+ }
+
+ @Override
+ public CompletableFuture<?> cancel(final Execution execution) {
+ canceledExecutions.add(execution);
+ return delegate.cancel(execution);
+ }
+
+ @Override
+ public void markFailed(Execution execution, Throwable cause) {
+ failedExecutions.add(execution);
+ delegate.markFailed(execution, cause);
+ }
+
+ public void enableFailDeploy() {
+ failDeploy = true;
+ }
+
+ public void disableFailDeploy() {
+ failDeploy = false;
+ }
+
+ public List<ExecutionAttemptID> getDeployedExecutions() {
+ return deployedExecutions.getExecutions();
+ }
+
+ public List<ExecutionAttemptID> getCanceledExecutions() {
+ return canceledExecutions.getExecutions();
+ }
+
+ public List<ExecutionAttemptID> getFailedExecutions() {
+ return failedExecutions.getExecutions();
+ }
+
+ public List<ExecutionVertexID> getDeployedVertices() {
+ return deployedExecutions.getVertices();
+ }
+
+ public List<ExecutionVertexID> getCanceledVertices() {
+ return canceledExecutions.getVertices();
+ }
+
+ public List<ExecutionVertexID> getFailedVertices() {
+ return failedExecutions.getVertices();
+ }
+
+ /** Waits until the given number of executions have been canceled. */
+ public void awaitCanceledExecutions(int count) throws InterruptedException {
+ canceledExecutions.await(count);
+ }
+
+ /** Waits until the given number of executions have been failed. */
+ public void awaitFailedExecutions(int count) throws InterruptedException {
+ failedExecutions.await(count);
+ }
+
+ private static class CountLatch {
+ @GuardedBy("lock")
+ private final List<Execution> executions = new ArrayList<>();
+
+ private final Object lock = new Object();
+
+ public void add(Execution execution) {
+ synchronized (lock) {
+ executions.add(execution);
+ lock.notifyAll();
+ }
+ }
+
+ public void await(int count) throws InterruptedException {
+ synchronized (lock) {
+ while (executions.size() < count) {
+ lock.wait();
+ }
+ }
+ }
+
+ public List<ExecutionAttemptID> getExecutions() {
+ synchronized (lock) {
+ return executions.stream()
+ .map(Execution::getAttemptId)
+ .collect(Collectors.toList());
+ }
+ }
+
+ public List<ExecutionVertexID> getVertices() {
+ synchronized (lock) {
+ return executions.stream()
+ .map(Execution::getVertex)
+ .map(ExecutionVertex::getID)
+ .collect(Collectors.toList());
+ }
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
index 095903e9968..cb36635d767 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.scheduler;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
@@ -39,7 +40,7 @@ import static org.apache.flink.util.Preconditions.checkState;
/** Test {@link ExecutionSlotAllocator} implementation. */
public class TestExecutionSlotAllocator implements ExecutionSlotAllocator, SlotOwner {
- private final Map<ExecutionVertexID, SlotExecutionVertexAssignment> pendingRequests =
+ private final Map<ExecutionAttemptID, ExecutionSlotAssignment> pendingRequests =
new HashMap<>();
private final TestingLogicalSlotBuilder logicalSlotBuilder;
@@ -63,33 +64,34 @@ public class TestExecutionSlotAllocator implements ExecutionSlotAllocator, SlotO
}
@Override
- public List<SlotExecutionVertexAssignment> allocateSlotsFor(
- final List<ExecutionVertexID> executionVertexIds) {
- final List<SlotExecutionVertexAssignment> slotVertexAssignments =
- createSlotVertexAssignments(executionVertexIds);
- registerPendingRequests(slotVertexAssignments);
+ public List<ExecutionSlotAssignment> allocateSlotsFor(
+ final List<ExecutionAttemptID> executionAttemptIds) {
+ final List<ExecutionSlotAssignment> executionSlotAssignments =
+ createExecutionSlotAssignments(executionAttemptIds);
+ registerPendingRequests(executionSlotAssignments);
maybeCompletePendingRequests();
- return slotVertexAssignments;
+ return executionSlotAssignments;
}
- private void registerPendingRequests(
- final List<SlotExecutionVertexAssignment> slotVertexAssignments) {
- for (SlotExecutionVertexAssignment slotVertexAssignment : slotVertexAssignments) {
- pendingRequests.put(slotVertexAssignment.getExecutionVertexId(), slotVertexAssignment);
- }
- }
+ private List<ExecutionSlotAssignment> createExecutionSlotAssignments(
+ final List<ExecutionAttemptID> executionAttemptIds) {
- private List<SlotExecutionVertexAssignment> createSlotVertexAssignments(
- final Collection<ExecutionVertexID> executionVertexIds) {
-
- final List<SlotExecutionVertexAssignment> result = new ArrayList<>();
- for (ExecutionVertexID executionVertexId : executionVertexIds) {
+ final List<ExecutionSlotAssignment> result = new ArrayList<>();
+ for (ExecutionAttemptID executionAttemptId : executionAttemptIds) {
final CompletableFuture<LogicalSlot> logicalSlotFuture = new CompletableFuture<>();
- result.add(new SlotExecutionVertexAssignment(executionVertexId, logicalSlotFuture));
+ result.add(new ExecutionSlotAssignment(executionAttemptId, logicalSlotFuture));
}
return result;
}
+ private void registerPendingRequests(
+ final List<ExecutionSlotAssignment> executionSlotAssignments) {
+ for (ExecutionSlotAssignment executionSlotAssignment : executionSlotAssignments) {
+ pendingRequests.put(
+ executionSlotAssignment.getExecutionAttemptId(), executionSlotAssignment);
+ }
+ }
+
private void maybeCompletePendingRequests() {
if (autoCompletePendingRequests) {
completePendingRequests();
@@ -97,36 +99,60 @@ public class TestExecutionSlotAllocator implements ExecutionSlotAllocator, SlotO
}
public void completePendingRequests() {
- final Collection<ExecutionVertexID> vertexIds = new ArrayList<>(pendingRequests.keySet());
- vertexIds.forEach(this::completePendingRequest);
+ final Collection<ExecutionAttemptID> executionIds =
+ new ArrayList<>(pendingRequests.keySet());
+ executionIds.forEach(this::completePendingRequest);
}
- public LogicalSlot completePendingRequest(final ExecutionVertexID executionVertexId) {
+ public LogicalSlot completePendingRequest(final ExecutionAttemptID executionAttemptId) {
final LogicalSlot slot = logicalSlotBuilder.setSlotOwner(this).createTestingLogicalSlot();
- final SlotExecutionVertexAssignment slotVertexAssignment =
- removePendingRequest(executionVertexId);
- checkState(slotVertexAssignment != null);
- slotVertexAssignment.getLogicalSlotFuture().complete(slot);
+ final ExecutionSlotAssignment executionSlotAssignment =
+ removePendingRequest(executionAttemptId);
+ checkState(executionSlotAssignment != null);
+ executionSlotAssignment.getLogicalSlotFuture().complete(slot);
return slot;
}
- private SlotExecutionVertexAssignment removePendingRequest(
- final ExecutionVertexID executionVertexId) {
- return pendingRequests.remove(executionVertexId);
+ public LogicalSlot completePendingRequest(final ExecutionVertexID executionVertexId) {
+ return completePendingRequest(findExecutionIdByVertexId(executionVertexId));
+ }
+
+ private ExecutionSlotAssignment removePendingRequest(
+ final ExecutionAttemptID executionAttemptId) {
+ return pendingRequests.remove(executionAttemptId);
}
public void timeoutPendingRequests() {
- final Collection<ExecutionVertexID> vertexIds = new ArrayList<>(pendingRequests.keySet());
- vertexIds.forEach(this::timeoutPendingRequest);
+ final Collection<ExecutionAttemptID> executionIds =
+ new ArrayList<>(pendingRequests.keySet());
+ executionIds.forEach(this::timeoutPendingRequest);
}
- public void timeoutPendingRequest(final ExecutionVertexID executionVertexId) {
- final SlotExecutionVertexAssignment slotVertexAssignment =
- removePendingRequest(executionVertexId);
+ public void timeoutPendingRequest(final ExecutionAttemptID executionAttemptId) {
+ final ExecutionSlotAssignment slotVertexAssignment =
+ removePendingRequest(executionAttemptId);
checkState(slotVertexAssignment != null);
slotVertexAssignment.getLogicalSlotFuture().completeExceptionally(new TimeoutException());
}
+ public void timeoutPendingRequest(final ExecutionVertexID executionVertexId) {
+ timeoutPendingRequest(findExecutionIdByVertexId(executionVertexId));
+ }
+
+ private ExecutionAttemptID findExecutionIdByVertexId(
+ final ExecutionVertexID executionVertexId) {
+ final List<ExecutionAttemptID> executionIds = new ArrayList<>();
+ for (ExecutionAttemptID executionAttemptId : pendingRequests.keySet()) {
+ if (executionAttemptId.getExecutionVertexId().equals(executionVertexId)) {
+ executionIds.add(executionAttemptId);
+ }
+ }
+ checkState(
+ executionIds.size() == 1,
+ "It is expected to find one and only one ExecutionAttemptID of the given ExecutionVertexID.");
+ return executionIds.get(0);
+ }
+
public void enableAutoCompletePendingRequests() {
autoCompletePendingRequests = true;
}
@@ -140,11 +166,11 @@ public class TestExecutionSlotAllocator implements ExecutionSlotAllocator, SlotO
}
@Override
- public void cancel(final ExecutionVertexID executionVertexId) {
- final SlotExecutionVertexAssignment slotVertexAssignment =
- removePendingRequest(executionVertexId);
- if (slotVertexAssignment != null) {
- slotVertexAssignment.getLogicalSlotFuture().cancel(false);
+ public void cancel(ExecutionAttemptID executionAttemptId) {
+ final ExecutionSlotAssignment executionSlotAssignment =
+ removePendingRequest(executionAttemptId);
+ if (executionSlotAssignment != null) {
+ executionSlotAssignment.getLogicalSlotFuture().cancel(false);
}
}
@@ -158,10 +184,10 @@ public class TestExecutionSlotAllocator implements ExecutionSlotAllocator, SlotO
final LogicalSlot slot =
logicalSlotBuilder.setSlotOwner(this).createTestingLogicalSlot();
- final SlotExecutionVertexAssignment slotVertexAssignment =
+ final ExecutionSlotAssignment executionSlotAssignment =
pendingRequests.remove(pendingRequests.keySet().stream().findAny().get());
- slotVertexAssignment.getLogicalSlotFuture().complete(slot);
+ executionSlotAssignment.getLogicalSlotFuture().complete(slot);
}
}
}
@@ -174,7 +200,7 @@ public class TestExecutionSlotAllocator implements ExecutionSlotAllocator, SlotO
return logicalSlotBuilder;
}
- public Map<ExecutionVertexID, SlotExecutionVertexAssignment> getPendingRequests() {
+ public Map<ExecutionAttemptID, ExecutionSlotAssignment> getPendingRequests() {
return Collections.unmodifiableMap(pendingRequests);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionVertexOperationsDecorator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionVertexOperationsDecorator.java
deleted file mode 100644
index 1323510e422..00000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionVertexOperationsDecorator.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.runtime.scheduler;
-
-import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-
-import javax.annotation.concurrent.GuardedBy;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * {@link ExecutionVertexOperations} decorator that enables instrumentation of execution vertex
- * operations for testing purposes.
- */
-public class TestExecutionVertexOperationsDecorator implements ExecutionVertexOperations {
-
- private final ExecutionVertexOperations delegate;
-
- private final CountLatch deployedVertices = new CountLatch();
- private final CountLatch canceledVertices = new CountLatch();
- private final CountLatch failedVertices = new CountLatch();
-
- private boolean failDeploy;
-
- public TestExecutionVertexOperationsDecorator(final ExecutionVertexOperations delegate) {
- this.delegate = checkNotNull(delegate);
- }
-
- @Override
- public void deploy(final ExecutionVertex executionVertex) throws JobException {
- deployedVertices.add(executionVertex.getID());
-
- if (failDeploy) {
- throw new RuntimeException("Expected");
- }
-
- delegate.deploy(executionVertex);
- }
-
- @Override
- public CompletableFuture<?> cancel(final ExecutionVertex executionVertex) {
- canceledVertices.add(executionVertex.getID());
- return delegate.cancel(executionVertex);
- }
-
- @Override
- public void markFailed(ExecutionVertex executionVertex, Throwable cause) {
- failedVertices.add(executionVertex.getID());
- delegate.markFailed(executionVertex, cause);
- }
-
- public void enableFailDeploy() {
- failDeploy = true;
- }
-
- public void disableFailDeploy() {
- failDeploy = false;
- }
-
- public List<ExecutionVertexID> getDeployedVertices() {
- return deployedVertices.getVertices();
- }
-
- public List<ExecutionVertexID> getCanceledVertices() {
- return canceledVertices.getVertices();
- }
-
- public List<ExecutionVertexID> getFailedVertices() {
- return failedVertices.getVertices();
- }
-
- /** Waits until the given number of vertices have been canceled. */
- public void awaitCanceledVertices(int count) throws InterruptedException {
- canceledVertices.await(count);
- }
-
- /** Waits until the given number of vertices have been failed. */
- public void awaitFailedVertices(int count) throws InterruptedException {
- failedVertices.await(count);
- }
-
- private static class CountLatch {
- @GuardedBy("lock")
- private final List<ExecutionVertexID> vertices = new ArrayList<>();
-
- private final Object lock = new Object();
-
- public void add(ExecutionVertexID executionVertexId) {
- synchronized (lock) {
- vertices.add(executionVertexId);
- lock.notifyAll();
- }
- }
-
- public void await(int count) throws InterruptedException {
- synchronized (lock) {
- while (vertices.size() < count) {
- lock.wait();
- }
- }
- }
-
- public List<ExecutionVertexID> getVertices() {
- synchronized (lock) {
- return Collections.unmodifiableList(vertices);
- }
- }
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
index 98152ee9546..12740adb7cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
@@ -88,7 +88,7 @@ public class AdaptiveBatchSchedulerTestUtils {
schedulingStrategyFactory,
failoverStrategyFactory,
restartBackoffTimeStrategy,
- executionVertexOperations,
+ executionOperations,
executionVertexVersioner,
executionSlotAllocatorFactory,
System.currentTimeMillis(),