You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/02/18 09:14:59 UTC
[flink] 11/12: [FLINK-21100][coordination] Add DeclarativeScheduler
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0efe64c4297d0f8b46ec22b4be88064d530bf488
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Feb 8 20:38:40 2021 +0100
[FLINK-21100][coordination] Add DeclarativeScheduler
---
.../flink/configuration/JobManagerOptions.java | 8 +-
.../dispatcher/DefaultJobManagerRunnerFactory.java | 5 +-
.../dispatcher/SchedulerNGFactoryFactory.java | 23 +-
.../failover/flip1/ExecutionFailureHandler.java | 4 +-
.../jobmaster/slotpool/SlotPoolServiceFactory.java | 9 +-
...pdateSchedulerNgOnInternalFailuresListener.java | 2 +-
.../declarative/DeclarativeScheduler.java | 930 +++++++++++++++++++++
.../declarative/DeclarativeSchedulerFactory.java | 110 +++
.../declarative/JobGraphJobInformation.java | 101 +++
.../ParallelismAndResourceAssignments.java | 50 ++
.../declarative/allocator/JobInformation.java | 8 +
.../declarative/allocator/SharedSlot.java | 4 +-
.../dispatcher/SchedulerNGFactoryFactoryTest.java | 16 +-
.../flink/runtime/jobmaster/JobMasterTest.java | 6 +-
.../slotpool/DefaultDeclarativeSlotPoolTest.java | 4 +-
.../runtime/jobmaster/utils/JobMasterBuilder.java | 6 +-
.../declarative/DeclarativeSchedulerBuilder.java | 189 +++++
.../DeclarativeSchedulerClusterITCase.java | 147 ++++
.../DeclarativeSchedulerSimpleITCase.java | 163 ++++
.../DeclarativeSchedulerSlotSharingITCase.java | 113 +++
.../declarative/DeclarativeSchedulerTest.java | 617 ++++++++++++++
.../declarative/allocator/SharedSlotTest.java | 11 +
.../testtasks/OnceBlockingNoOpInvokable.java | 88 ++
.../scheduling/DeclarativeSchedulerITCase.java | 150 ++++
24 files changed, 2744 insertions(+), 20 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 06da369..3e86c69 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -356,12 +356,16 @@ public class JobManagerOptions {
Description.builder()
.text(
"Determines which scheduler implementation is used to schedule tasks. Accepted values are:")
- .list(text("'Ng': new generation scheduler"))
+ .list(
+ text("'Ng': new generation scheduler"),
+ text(
+ "'Declarative': declarative scheduler; supports reactive mode"))
.build());
/** Type of scheduler implementation. */
public enum SchedulerType {
- Ng
+ Ng,
+ Declarative
}
@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
index b063a0f..839faeb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
@@ -57,9 +57,10 @@ public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
JobMasterConfiguration.fromConfiguration(configuration);
final SlotPoolServiceFactory slotPoolFactory =
- SlotPoolServiceFactory.fromConfiguration(configuration);
+ SlotPoolServiceFactory.fromConfiguration(configuration, jobGraph.getJobType());
final SchedulerNGFactory schedulerNGFactory =
- SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
+ SchedulerNGFactoryFactory.createSchedulerNGFactory(
+ configuration, jobGraph.getJobType());
final ShuffleMaster<?> shuffleMaster =
ShuffleServiceLoader.loadShuffleServiceFactory(configuration)
.createShuffleMaster(configuration);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
index 34eae71..a7e5b02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
@@ -21,20 +21,39 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import org.apache.flink.runtime.scheduler.declarative.DeclarativeSchedulerFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Factory for {@link SchedulerNGFactory}. */
public final class SchedulerNGFactoryFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(SchedulerNGFactoryFactory.class);
+
private SchedulerNGFactoryFactory() {}
- public static SchedulerNGFactory createSchedulerNGFactory(final Configuration configuration) {
- final JobManagerOptions.SchedulerType schedulerType =
+ public static SchedulerNGFactory createSchedulerNGFactory(
+ final Configuration configuration, JobType jobType) {
+ JobManagerOptions.SchedulerType schedulerType =
configuration.get(JobManagerOptions.SCHEDULER);
+
+ if (schedulerType == JobManagerOptions.SchedulerType.Declarative
+ && jobType == JobType.BATCH) {
+ LOG.info(
+ "Declarative Scheduler configured, but Batch job detected. Changing scheduler type to NG / DefaultScheduler.");
+ // overwrite
+ schedulerType = JobManagerOptions.SchedulerType.Ng;
+ }
+
switch (schedulerType) {
case Ng:
return new DefaultSchedulerFactory();
+ case Declarative:
+ return new DeclarativeSchedulerFactory();
default:
throw new IllegalArgumentException(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
index 1bfe42c..6961b45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
@@ -17,7 +17,6 @@
package org.apache.flink.runtime.executiongraph.failover.flip1;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
@@ -122,8 +121,7 @@ public class ExecutionFailureHandler {
}
}
- @VisibleForTesting
- static boolean isUnrecoverableError(Throwable cause) {
+ public static boolean isUnrecoverableError(Throwable cause) {
Optional<Throwable> unrecoverableError =
ThrowableClassifier.findThrowableOfThrowableType(
cause, ThrowableType.NonRecoverableError);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
index 8a2f078..b486747 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.util.clock.SystemClock;
import javax.annotation.Nonnull;
@@ -34,7 +35,7 @@ public interface SlotPoolServiceFactory {
@Nonnull
SlotPoolService createSlotPoolService(@Nonnull JobID jobId);
- static SlotPoolServiceFactory fromConfiguration(Configuration configuration) {
+ static SlotPoolServiceFactory fromConfiguration(Configuration configuration, JobType jobType) {
final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
final Time slotIdleTimeout =
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));
@@ -42,6 +43,12 @@ public interface SlotPoolServiceFactory {
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
if (ClusterOptions.isDeclarativeResourceManagementEnabled(configuration)) {
+ if (configuration.get(JobManagerOptions.SCHEDULER)
+ == JobManagerOptions.SchedulerType.Declarative
+ && jobType == JobType.STREAMING) {
+ return new DeclarativeSlotPoolServiceFactory(
+ SystemClock.getInstance(), slotIdleTimeout, rpcTimeout);
+ }
return new DeclarativeSlotPoolBridgeServiceFactory(
SystemClock.getInstance(), rpcTimeout, slotIdleTimeout, batchSlotTimeout);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalFailuresListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalFailuresListener.java
index aa3763a..b97620e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalFailuresListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalFailuresListener.java
@@ -31,7 +31,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* Calls {@link SchedulerNG#updateTaskExecutionState(TaskExecutionStateTransition)} on task failure.
* Calls {@link SchedulerNG#handleGlobalFailure(Throwable)} on global failures.
*/
-class UpdateSchedulerNgOnInternalFailuresListener implements InternalFailuresListener {
+public class UpdateSchedulerNgOnInternalFailuresListener implements InternalFailuresListener {
private final SchedulerNG schedulerNg;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java
new file mode 100644
index 0000000..ac89434
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java
@@ -0,0 +1,930 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
+import org.apache.flink.runtime.scheduler.declarative.allocator.SlotAllocator;
+import org.apache.flink.runtime.scheduler.declarative.allocator.SlotSharingSlotAllocator;
+import org.apache.flink.runtime.scheduler.declarative.allocator.VertexParallelism;
+import org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ReactiveScaleUpController;
+import org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ScaleUpController;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link SchedulerNG} implementation that uses the declarative resource management and
+ * automatically adapts the parallelism in case not enough resource could be acquired to run at the
+ * configured parallelism, as described in FLIP-160.
+ *
+ * <p>This scheduler only supports jobs with streaming semantics, i.e., all vertices are connected
+ * via pipelined data-exchanges.
+ *
+ * <p>The implementation is spread over multiple {@link State} classes that control which RPCs are
+ * allowed in a given state and what state transitions are possible (see the FLIP for an overview).
+ * This class can thus be roughly split into 2 parts:
+ *
+ * <p>1) RPCs, which must forward the call to the state via {@link State#tryRun(Class,
+ * ThrowingConsumer, String)} or {@link State#tryCall(Class, FunctionWithException, String)}.
+ *
+ * <p>2) Context methods, which are called by states, to either transition into another state or
+ * access functionality of some component in the scheduler.
+ */
+public class DeclarativeScheduler
+ implements SchedulerNG,
+ Created.Context,
+ WaitingForResources.Context,
+ Executing.Context,
+ Restarting.Context,
+ Failing.Context,
+ Finished.Context {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DeclarativeScheduler.class);
+
+ private final JobGraphJobInformation jobInformation;
+
+ private final DeclarativeSlotPool declarativeSlotPool;
+
+ private final long initializationTimestamp;
+
+ private final Configuration configuration;
+ private final ScheduledExecutorService futureExecutor;
+ private final Executor ioExecutor;
+ private final ClassLoader userCodeClassLoader;
+ private final Time rpcTimeout;
+ private final BlobWriter blobWriter;
+ private final ShuffleMaster<?> shuffleMaster;
+ private final JobMasterPartitionTracker partitionTracker;
+ private final ExecutionDeploymentTracker executionDeploymentTracker;
+ private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
+
+ private final CompletedCheckpointStore completedCheckpointStore;
+ private final CheckpointIDCounter checkpointIdCounter;
+ private final CheckpointsCleaner checkpointsCleaner;
+
+ private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+ private final RestartBackoffTimeStrategy restartBackoffTimeStrategy;
+
+ private final ComponentMainThreadExecutor componentMainThreadExecutor;
+ private final FatalErrorHandler fatalErrorHandler;
+
+ private final JobStatusListener jobStatusListener;
+
+ private final SlotAllocator<?> slotAllocator;
+
+ private final ScaleUpController scaleUpController;
+
+ private State state = new Created(this, LOG);
+
+ public DeclarativeScheduler(
+ JobGraph jobGraph,
+ Configuration configuration,
+ DeclarativeSlotPool declarativeSlotPool,
+ ScheduledExecutorService futureExecutor,
+ Executor ioExecutor,
+ ClassLoader userCodeClassLoader,
+ CheckpointRecoveryFactory checkpointRecoveryFactory,
+ Time rpcTimeout,
+ BlobWriter blobWriter,
+ JobManagerJobMetricGroup jobManagerJobMetricGroup,
+ ShuffleMaster<?> shuffleMaster,
+ JobMasterPartitionTracker partitionTracker,
+ RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+ ExecutionDeploymentTracker executionDeploymentTracker,
+ long initializationTimestamp,
+ ComponentMainThreadExecutor mainThreadExecutor,
+ FatalErrorHandler fatalErrorHandler,
+ JobStatusListener jobStatusListener)
+ throws JobExecutionException {
+
+ ensureFullyPipelinedStreamingJob(jobGraph);
+
+ this.jobInformation = new JobGraphJobInformation(jobGraph);
+ this.declarativeSlotPool = declarativeSlotPool;
+ this.initializationTimestamp = initializationTimestamp;
+ this.configuration = configuration;
+ this.futureExecutor = futureExecutor;
+ this.ioExecutor = ioExecutor;
+ this.userCodeClassLoader = userCodeClassLoader;
+ this.rpcTimeout = rpcTimeout;
+ this.blobWriter = blobWriter;
+ this.shuffleMaster = shuffleMaster;
+ this.partitionTracker = partitionTracker;
+ this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+ this.executionDeploymentTracker = executionDeploymentTracker;
+ this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+ this.fatalErrorHandler = fatalErrorHandler;
+ this.completedCheckpointStore =
+ SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(
+ jobGraph,
+ configuration,
+ userCodeClassLoader,
+ checkpointRecoveryFactory,
+ LOG);
+ this.checkpointIdCounter =
+ SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled(
+ jobGraph, checkpointRecoveryFactory);
+ this.checkpointsCleaner = new CheckpointsCleaner();
+
+ this.slotAllocator =
+ new SlotSharingSlotAllocator(
+ declarativeSlotPool::reserveFreeSlot,
+ declarativeSlotPool::freeReservedSlot);
+
+ for (JobVertex vertex : jobGraph.getVertices()) {
+ if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) {
+ vertex.setParallelism(1);
+ }
+ }
+
+ declarativeSlotPool.registerNewSlotsListener(this::newResourcesAvailable);
+
+ this.componentMainThreadExecutor = mainThreadExecutor;
+ this.jobStatusListener = jobStatusListener;
+
+ this.scaleUpController = new ReactiveScaleUpController(configuration);
+ }
+
+ private static void ensureFullyPipelinedStreamingJob(JobGraph jobGraph)
+ throws RuntimeException {
+ Preconditions.checkState(
+ jobGraph.getJobType() == JobType.STREAMING,
+ "The declarative scheduler only supports streaming jobs.");
+ Preconditions.checkState(
+ jobGraph.getScheduleMode()
+ != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+ "The declarative schedules does not support batch slot requests.");
+
+ for (JobVertex vertex : jobGraph.getVertices()) {
+ for (JobEdge jobEdge : vertex.getInputs()) {
+ Preconditions.checkState(
+ jobEdge.getSource().getResultType().isPipelined(),
+ "The declarative scheduler supports pipelined data exchanges (violated by %s -> %s).",
+ jobEdge.getSource().getProducer(),
+ jobEdge.getTarget().getID());
+ }
+ }
+ }
+
+ private void newResourcesAvailable(Collection<? extends PhysicalSlot> physicalSlots) {
+ state.tryRun(
+ ResourceConsumer.class,
+ ResourceConsumer::notifyNewResourcesAvailable,
+ "newResourcesAvailable");
+ }
+
+ @Override
+ public void startScheduling() {
+ state.as(Created.class)
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Can only start scheduling when being in Created state."))
+ .startScheduling();
+ }
+
+ @Override
+ public void suspend(Throwable cause) {
+ state.suspend(cause);
+ }
+
+ @Override
+ public void cancel() {
+ state.cancel();
+ }
+
+ @Override
+ public CompletableFuture<Void> getTerminationFuture() {
+ return terminationFuture;
+ }
+
+ @Override
+ public void handleGlobalFailure(Throwable cause) {
+ state.handleGlobalFailure(cause);
+ }
+
+ @Override
+ public boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState) {
+ return state.tryCall(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.updateTaskExecutionState(
+ taskExecutionState),
+ "updateTaskExecutionState")
+ .orElse(false);
+ }
+
+ @Override
+ public SerializedInputSplit requestNextInputSplit(
+ JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException {
+ return state.tryCall(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.requestNextInputSplit(
+ vertexID, executionAttempt),
+ "requestNextInputSplit")
+ .orElseThrow(
+ () -> new IOException("Scheduler is currently not executing the job."));
+ }
+
+ @Override
+ public ExecutionState requestPartitionState(
+ IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId)
+ throws PartitionProducerDisposedException {
+ return state.tryCall(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.requestPartitionState(
+ intermediateResultId, resultPartitionId),
+ "requestPartitionState")
+ .orElseThrow(() -> new PartitionProducerDisposedException(resultPartitionId));
+ }
+
+ @Override
+ public void notifyPartitionDataAvailable(ResultPartitionID partitionID) {
+ state.tryRun(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.notifyPartitionDataAvailable(partitionID),
+ "notifyPartitionDataAvailable");
+ }
+
+ @Override
+ public ArchivedExecutionGraph requestJob() {
+ return state.getJob();
+ }
+
+ @Override
+ public JobStatus requestJobStatus() {
+ return state.getJobStatus();
+ }
+
+ @Override
+ public JobDetails requestJobDetails() {
+ return JobDetails.createDetailsForJob(state.getJob());
+ }
+
+ @Override
+ public KvStateLocation requestKvStateLocation(JobID jobId, String registrationName)
+ throws UnknownKvStateLocation, FlinkJobNotFoundException {
+ final Optional<StateWithExecutionGraph> asOptional =
+ state.as(StateWithExecutionGraph.class);
+
+ if (asOptional.isPresent()) {
+ return asOptional.get().requestKvStateLocation(jobId, registrationName);
+ } else {
+ throw new UnknownKvStateLocation(registrationName);
+ }
+ }
+
+ @Override
+ public void notifyKvStateRegistered(
+ JobID jobId,
+ JobVertexID jobVertexId,
+ KeyGroupRange keyGroupRange,
+ String registrationName,
+ KvStateID kvStateId,
+ InetSocketAddress kvStateServerAddress)
+ throws FlinkJobNotFoundException {
+ state.tryRun(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.notifyKvStateRegistered(
+ jobId,
+ jobVertexId,
+ keyGroupRange,
+ registrationName,
+ kvStateId,
+ kvStateServerAddress),
+ "notifyKvStateRegistered");
+ }
+
+ @Override
+ public void notifyKvStateUnregistered(
+ JobID jobId,
+ JobVertexID jobVertexId,
+ KeyGroupRange keyGroupRange,
+ String registrationName)
+ throws FlinkJobNotFoundException {
+ state.tryRun(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.notifyKvStateUnregistered(
+ jobId, jobVertexId, keyGroupRange, registrationName),
+ "notifyKvStateUnregistered");
+ }
+
+ @Override
+ public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
+ state.tryRun(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.updateAccumulators(accumulatorSnapshot),
+ "updateAccumulators");
+ }
+
+ @Override
+ public CompletableFuture<String> triggerSavepoint(
+ @Nullable String targetDirectory, boolean cancelJob) {
+ return state.tryCall(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.triggerSavepoint(
+ targetDirectory, cancelJob),
+ "triggerSavepoint")
+ .orElse(
+ FutureUtils.completedExceptionally(
+ new CheckpointException(
+ "The Flink job is currently not executing.",
+ CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE)));
+ }
+
+ @Override
+ public void acknowledgeCheckpoint(
+ JobID jobID,
+ ExecutionAttemptID executionAttemptID,
+ long checkpointId,
+ CheckpointMetrics checkpointMetrics,
+ TaskStateSnapshot checkpointState) {
+ state.tryRun(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.acknowledgeCheckpoint(
+ jobID,
+ executionAttemptID,
+ checkpointId,
+ checkpointMetrics,
+ checkpointState),
+ "acknowledgeCheckpoint");
+ }
+
+ @Override
+ public void reportCheckpointMetrics(
+ JobID jobID,
+ ExecutionAttemptID executionAttemptID,
+ long checkpointId,
+ CheckpointMetrics checkpointMetrics) {
+ state.tryRun(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.reportCheckpointMetrics(
+ executionAttemptID, checkpointId, checkpointMetrics),
+ "reportCheckpointMetrics");
+ }
+
+ @Override
+ public void declineCheckpoint(DeclineCheckpoint decline) {
+ state.tryRun(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph -> stateWithExecutionGraph.declineCheckpoint(decline),
+ "declineCheckpoint");
+ }
+
+ @Override
+ public CompletableFuture<String> stopWithSavepoint(
+ String targetDirectory, boolean advanceToEndOfEventTime) {
+ return state.tryCall(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.stopWithSavepoint(
+ targetDirectory, advanceToEndOfEventTime),
+ "stopWithSavepoint")
+ .orElse(
+ FutureUtils.completedExceptionally(
+ new CheckpointException(
+ "The Flink job is currently not executing.",
+ CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE)));
+ }
+
+ @Override
+ public void deliverOperatorEventToCoordinator(
+ ExecutionAttemptID taskExecution, OperatorID operator, OperatorEvent evt)
+ throws FlinkException {
+ final StateWithExecutionGraph stateWithExecutionGraph =
+ state.as(StateWithExecutionGraph.class)
+ .orElseThrow(
+ () ->
+ new TaskNotRunningException(
+ "Task is not known or in state running on the JobManager."));
+
+ stateWithExecutionGraph.deliverOperatorEventToCoordinator(taskExecution, operator, evt);
+ }
+
+ @Override
+ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+ OperatorID operator, CoordinationRequest request) throws FlinkException {
+ return state.tryCall(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.deliverCoordinationRequestToCoordinator(
+ operator, request),
+ "deliverCoordinationRequestToCoordinator")
+ .orElseGet(
+ () ->
+ FutureUtils.completedExceptionally(
+ new FlinkException(
+ "Coordinator of operator "
+ + operator
+ + " does not exist")));
+ }
+
+ // ----------------------------------------------------------------
+
+ @Override
+ public boolean hasEnoughResources(ResourceCounter desiredResources) {
+ final Collection<? extends SlotInfo> allSlots =
+ declarativeSlotPool.getFreeSlotsInformation();
+ ResourceCounter outstandingResources = desiredResources;
+
+ final Iterator<? extends SlotInfo> slotIterator = allSlots.iterator();
+
+ while (!outstandingResources.isEmpty() && slotIterator.hasNext()) {
+ final SlotInfo slotInfo = slotIterator.next();
+ final ResourceProfile resourceProfile = slotInfo.getResourceProfile();
+
+ if (outstandingResources.containsResource(resourceProfile)) {
+ outstandingResources = outstandingResources.subtract(resourceProfile, 1);
+ } else {
+ outstandingResources = outstandingResources.subtract(ResourceProfile.UNKNOWN, 1);
+ }
+ }
+
+ return outstandingResources.isEmpty();
+ }
+
+ private <T extends VertexParallelism>
+ ParallelismAndResourceAssignments determineParallelismAndAssignResources(
+ SlotAllocator<T> slotAllocator) throws JobExecutionException {
+
+ final T vertexParallelism =
+ slotAllocator
+ .determineParallelism(
+ jobInformation, declarativeSlotPool.getFreeSlotsInformation())
+ .orElseThrow(
+ () ->
+ new JobExecutionException(
+ jobInformation.getJobID(),
+ "Not enough resources available for scheduling."));
+
+ final Map<ExecutionVertexID, LogicalSlot> slotAssignments =
+ slotAllocator.reserveResources(vertexParallelism);
+
+ return new ParallelismAndResourceAssignments(
+ slotAssignments, vertexParallelism.getMaxParallelismForVertices());
+ }
+
+ @Override
+ public ExecutionGraph createExecutionGraphWithAvailableResources() throws Exception {
+ final ParallelismAndResourceAssignments parallelismAndResourceAssignments =
+ determineParallelismAndAssignResources(slotAllocator);
+
+ JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
+ for (JobVertex vertex : adjustedJobGraph.getVertices()) {
+ vertex.setParallelism(parallelismAndResourceAssignments.getParallelism(vertex.getID()));
+ }
+
+ final ExecutionGraph executionGraph = createExecutionGraphAndRestoreState(adjustedJobGraph);
+
+ executionGraph.start(componentMainThreadExecutor);
+ executionGraph.transitionToRunning();
+
+ executionGraph.setInternalTaskFailuresListener(
+ new UpdateSchedulerNgOnInternalFailuresListener(this, jobInformation.getJobID()));
+
+ for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
+ final LogicalSlot assignedSlot =
+ parallelismAndResourceAssignments.getAssignedSlot(executionVertex.getID());
+ executionVertex
+ .getCurrentExecutionAttempt()
+ .registerProducedPartitions(assignedSlot.getTaskManagerLocation(), false);
+ executionVertex.tryAssignResource(assignedSlot);
+ }
+ return executionGraph;
+ }
+
+ private ExecutionGraph createExecutionGraphAndRestoreState(JobGraph adjustedJobGraph)
+ throws Exception {
+ ExecutionDeploymentListener executionDeploymentListener =
+ new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
+ ExecutionStateUpdateListener executionStateUpdateListener =
+ (execution, newState) -> {
+ if (newState.isTerminal()) {
+ executionDeploymentTracker.stopTrackingDeploymentOf(execution);
+ }
+ };
+
+ final ExecutionGraph newExecutionGraph =
+ ExecutionGraphBuilder.buildGraph(
+ adjustedJobGraph,
+ configuration,
+ futureExecutor,
+ ioExecutor,
+ userCodeClassLoader,
+ completedCheckpointStore,
+ checkpointsCleaner,
+ checkpointIdCounter,
+ rpcTimeout,
+ jobManagerJobMetricGroup,
+ blobWriter,
+ LOG,
+ shuffleMaster,
+ partitionTracker,
+ executionDeploymentListener,
+ executionStateUpdateListener,
+ initializationTimestamp);
+
+ final CheckpointCoordinator checkpointCoordinator =
+ newExecutionGraph.getCheckpointCoordinator();
+
+ if (checkpointCoordinator != null) {
+ // check whether we find a valid checkpoint
+ if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
+ new HashSet<>(newExecutionGraph.getAllVertices().values()))) {
+
+ // check whether we can restore from a savepoint
+ tryRestoreExecutionGraphFromSavepoint(
+ newExecutionGraph, adjustedJobGraph.getSavepointRestoreSettings());
+ }
+ }
+
+ return newExecutionGraph;
+ }
+
+ /**
+ * Tries to restore the given {@link ExecutionGraph} from the provided {@link
+ * SavepointRestoreSettings}, iff checkpointing is enabled.
+ *
+ * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored
+ * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about
+ * the savepoint to restore from
+ * @throws Exception if the {@link ExecutionGraph} could not be restored
+ */
+ private void tryRestoreExecutionGraphFromSavepoint(
+ ExecutionGraph executionGraphToRestore,
+ SavepointRestoreSettings savepointRestoreSettings)
+ throws Exception {
+ if (savepointRestoreSettings.restoreSavepoint()) {
+ final CheckpointCoordinator checkpointCoordinator =
+ executionGraphToRestore.getCheckpointCoordinator();
+ if (checkpointCoordinator != null) {
+ checkpointCoordinator.restoreSavepoint(
+ savepointRestoreSettings.getRestorePath(),
+ savepointRestoreSettings.allowNonRestoredState(),
+ executionGraphToRestore.getAllVertices(),
+ userCodeClassLoader);
+ }
+ }
+ }
+
+ @Override
+ public ArchivedExecutionGraph getArchivedExecutionGraph(
+ JobStatus jobStatus, @Nullable Throwable cause) {
+ return ArchivedExecutionGraph.createFromInitializingJob(
+ jobInformation.getJobID(),
+ jobInformation.getName(),
+ jobStatus,
+ cause,
+ initializationTimestamp);
+ }
+
+ @Override
+ public void goToWaitingForResources() {
+ final ResourceCounter desiredResources = calculateDesiredResources();
+ declarativeSlotPool.setResourceRequirements(desiredResources);
+
+ // TODO: add resourceTimeout parameter
+ transitionToState(
+ new WaitingForResources(this, LOG, desiredResources, Duration.ofSeconds(10)));
+ }
+
+ private ResourceCounter calculateDesiredResources() {
+ return slotAllocator.calculateRequiredSlots(jobInformation.getVertices());
+ }
+
+ @Override
+ public void goToExecuting(ExecutionGraph executionGraph) {
+ final ExecutionGraphHandler executionGraphHandler =
+ new ExecutionGraphHandler(
+ executionGraph, LOG, ioExecutor, componentMainThreadExecutor);
+ final OperatorCoordinatorHandler operatorCoordinatorHandler =
+ new OperatorCoordinatorHandler(executionGraph, this::handleGlobalFailure);
+ operatorCoordinatorHandler.initializeOperatorCoordinators(componentMainThreadExecutor);
+ operatorCoordinatorHandler.startAllOperatorCoordinators();
+
+ transitionToState(
+ new Executing(
+ executionGraph,
+ executionGraphHandler,
+ operatorCoordinatorHandler,
+ LOG,
+ this,
+ userCodeClassLoader));
+ }
+
+ @Override
+ public void goToCanceling(
+ ExecutionGraph executionGraph,
+ ExecutionGraphHandler executionGraphHandler,
+ OperatorCoordinatorHandler operatorCoordinatorHandler) {
+ transitionToState(
+ new Canceling(
+ this,
+ executionGraph,
+ executionGraphHandler,
+ operatorCoordinatorHandler,
+ LOG));
+ }
+
+ @Override
+ public void goToRestarting(
+ ExecutionGraph executionGraph,
+ ExecutionGraphHandler executionGraphHandler,
+ OperatorCoordinatorHandler operatorCoordinatorHandler,
+ Duration backoffTime) {
+ transitionToState(
+ new Restarting(
+ this,
+ executionGraph,
+ executionGraphHandler,
+ operatorCoordinatorHandler,
+ LOG,
+ backoffTime));
+ }
+
+ @Override
+ public void goToFailing(
+ ExecutionGraph executionGraph,
+ ExecutionGraphHandler executionGraphHandler,
+ OperatorCoordinatorHandler operatorCoordinatorHandler,
+ Throwable failureCause) {
+ transitionToState(
+ new Failing(
+ this,
+ executionGraph,
+ executionGraphHandler,
+ operatorCoordinatorHandler,
+ LOG,
+ failureCause));
+ }
+
+ @Override
+ public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) {
+ transitionToState(new Finished(this, archivedExecutionGraph, LOG));
+ }
+
+ @Override
+ public boolean canScaleUp(ExecutionGraph executionGraph) {
+ int availableSlots = declarativeSlotPool.getFreeSlotsInformation().size();
+
+ if (availableSlots > 0) {
+ final Optional<? extends VertexParallelism> potentialNewParallelism =
+ slotAllocator.determineParallelism(
+ jobInformation, declarativeSlotPool.getAllSlotsInformation());
+
+ if (potentialNewParallelism.isPresent()) {
+ int currentCumulativeParallelism = getCurrentCumulativeParallelism(executionGraph);
+ int newCumulativeParallelism =
+ getCumulativeParallelism(potentialNewParallelism.get());
+ if (newCumulativeParallelism > currentCumulativeParallelism) {
+ LOG.debug(
+ "Offering scale up to scale up controller with currentCumulativeParallelism={}, newCumulativeParallelism={}",
+ currentCumulativeParallelism,
+ newCumulativeParallelism);
+ return scaleUpController.canScaleUp(
+ currentCumulativeParallelism, newCumulativeParallelism);
+ }
+ }
+ }
+ return false;
+ }
+
+ private static int getCurrentCumulativeParallelism(ExecutionGraph executionGraph) {
+ return executionGraph.getAllVertices().values().stream()
+ .map(ExecutionJobVertex::getParallelism)
+ .reduce(0, Integer::sum);
+ }
+
+ private static int getCumulativeParallelism(VertexParallelism potentialNewParallelism) {
+ return potentialNewParallelism.getMaxParallelismForVertices().values().stream()
+ .reduce(0, Integer::sum);
+ }
+
+ @Override
+ public void onFinished(ArchivedExecutionGraph archivedExecutionGraph) {
+ stopCheckpointServicesSafely(archivedExecutionGraph.getState());
+
+ if (jobStatusListener != null) {
+ jobStatusListener.jobStatusChanges(
+ jobInformation.getJobID(),
+ archivedExecutionGraph.getState(),
+ archivedExecutionGraph.getStatusTimestamp(archivedExecutionGraph.getState()),
+ archivedExecutionGraph.getFailureInfo() != null
+ ? archivedExecutionGraph.getFailureInfo().getException()
+ : null);
+ }
+ }
+
+ private void stopCheckpointServicesSafely(JobStatus terminalState) {
+ Exception exception = null;
+
+ try {
+ completedCheckpointStore.shutdown(terminalState, checkpointsCleaner);
+ } catch (Exception e) {
+ exception = e;
+ }
+
+ try {
+ checkpointIdCounter.shutdown(terminalState);
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ if (exception != null) {
+ LOG.warn("Failed to stop checkpoint services.", exception);
+ }
+ }
+
+ @Override
+ public Executing.FailureResult howToHandleFailure(Throwable failure) {
+ if (ExecutionFailureHandler.isUnrecoverableError(failure)) {
+ return Executing.FailureResult.canNotRestart(
+ new JobException("The failure is not recoverable", failure));
+ }
+
+ restartBackoffTimeStrategy.notifyFailure(failure);
+ if (restartBackoffTimeStrategy.canRestart()) {
+ return Executing.FailureResult.canRestart(
+ Duration.ofMillis(restartBackoffTimeStrategy.getBackoffTime()));
+ } else {
+ return Executing.FailureResult.canNotRestart(
+ new JobException(
+ "Recovery is suppressed by " + restartBackoffTimeStrategy, failure));
+ }
+ }
+
+ @Override
+ public Executor getMainThreadExecutor() {
+ return componentMainThreadExecutor;
+ }
+
+ @Override
+ public boolean isState(State expectedState) {
+ return expectedState == this.state;
+ }
+
+ @Override
+ public void runIfState(State expectedState, Runnable action) {
+ if (isState(expectedState)) {
+ try {
+ action.run();
+ } catch (Throwable t) {
+ fatalErrorHandler.onFatalError(t);
+ }
+ } else {
+ LOG.debug(
+ "Ignoring scheduled action because expected state {} is not the actual state {}.",
+ expectedState,
+ state);
+ }
+ }
+
+ @Override
+ public void runIfState(State expectedState, Runnable action, Duration delay) {
+ componentMainThreadExecutor.schedule(
+ () -> runIfState(expectedState, action), delay.toMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ // ----------------------------------------------------------------
+
+ @VisibleForTesting
+ void transitionToState(State newState) {
+ if (state != newState) {
+ LOG.debug(
+ "Transition from state {} to {}.",
+ state.getClass().getSimpleName(),
+ newState.getClass().getSimpleName());
+
+ State oldState = state;
+ oldState.onLeave(newState.getClass());
+
+ state = newState;
+ newState.onEnter();
+ }
+ }
+
+ @VisibleForTesting
+ State getState() {
+ return state;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerFactory.java
new file mode 100644
index 0000000..5e7d630
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+/** Factory for the declarative scheduler. */
+public class DeclarativeSchedulerFactory implements SchedulerNGFactory {
+ @Override
+ public SchedulerNG createInstance(
+ Logger log,
+ JobGraph jobGraph,
+ Executor ioExecutor,
+ Configuration jobMasterConfiguration,
+ SlotPoolService slotPoolService,
+ ScheduledExecutorService futureExecutor,
+ ClassLoader userCodeLoader,
+ CheckpointRecoveryFactory checkpointRecoveryFactory,
+ Time rpcTimeout,
+ BlobWriter blobWriter,
+ JobManagerJobMetricGroup jobManagerJobMetricGroup,
+ Time slotRequestTimeout,
+ ShuffleMaster<?> shuffleMaster,
+ JobMasterPartitionTracker partitionTracker,
+ ExecutionDeploymentTracker executionDeploymentTracker,
+ long initializationTimestamp,
+ ComponentMainThreadExecutor mainThreadExecutor,
+ FatalErrorHandler fatalErrorHandler,
+ JobStatusListener jobStatusListener)
+ throws Exception {
+ final DeclarativeSlotPool declarativeSlotPool =
+ slotPoolService
+ .castInto(DeclarativeSlotPool.class)
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "The DeclarativeScheduler requires a DeclarativeSlotPool."));
+ final RestartBackoffTimeStrategy restartBackoffTimeStrategy =
+ RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(
+ jobGraph.getSerializedExecutionConfig()
+ .deserializeValue(userCodeLoader)
+ .getRestartStrategy(),
+ jobMasterConfiguration,
+ jobGraph.isCheckpointingEnabled())
+ .create();
+ log.info(
+ "Using restart back off time strategy {} for {} ({}).",
+ restartBackoffTimeStrategy,
+ jobGraph.getName(),
+ jobGraph.getJobID());
+
+ return new DeclarativeScheduler(
+ jobGraph,
+ jobMasterConfiguration,
+ declarativeSlotPool,
+ futureExecutor,
+ ioExecutor,
+ userCodeLoader,
+ checkpointRecoveryFactory,
+ rpcTimeout,
+ blobWriter,
+ jobManagerJobMetricGroup,
+ shuffleMaster,
+ partitionTracker,
+ restartBackoffTimeStrategy,
+ executionDeploymentTracker,
+ initializationTimestamp,
+ mainThreadExecutor,
+ fatalErrorHandler,
+ jobStatusListener);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/JobGraphJobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/JobGraphJobInformation.java
new file mode 100644
index 0000000..8139321
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/JobGraphJobInformation.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.declarative.allocator.JobInformation;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/** {@link JobInformation} created from a {@link JobGraph}. */
+public class JobGraphJobInformation implements JobInformation {
+
+ private final JobGraph jobGraph;
+ private final JobID jobID;
+ private final String name;
+
+ public JobGraphJobInformation(JobGraph jobGraph) {
+ this.jobGraph = jobGraph;
+ this.jobID = jobGraph.getJobID();
+ this.name = jobGraph.getName();
+ }
+
+ @Override
+ public Collection<SlotSharingGroup> getSlotSharingGroups() {
+ return jobGraph.getSlotSharingGroups();
+ }
+
+ @Override
+ public JobInformation.VertexInformation getVertexInformation(JobVertexID jobVertexId) {
+ return new JobVertexInformation(jobGraph.findVertexByID(jobVertexId));
+ }
+
+ public JobID getJobID() {
+ return jobID;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Iterable<JobInformation.VertexInformation> getVertices() {
+ return jobGraphVerticesToVertexInformation(jobGraph.getVertices());
+ }
+
+ public static Iterable<JobInformation.VertexInformation> jobGraphVerticesToVertexInformation(
+ Iterable<JobVertex> verticesIterable) {
+ return Iterables.transform(verticesIterable, JobVertexInformation::new);
+ }
+
+ /** Returns a copy of a jobGraph that can be mutated. */
+ public JobGraph copyJobGraph() throws IOException, ClassNotFoundException {
+ return InstantiationUtil.clone(jobGraph);
+ }
+
+ private static final class JobVertexInformation implements JobInformation.VertexInformation {
+
+ private final JobVertex jobVertex;
+
+ private JobVertexInformation(JobVertex jobVertex) {
+ this.jobVertex = jobVertex;
+ }
+
+ @Override
+ public JobVertexID getJobVertexID() {
+ return jobVertex.getID();
+ }
+
+ @Override
+ public int getParallelism() {
+ return jobVertex.getParallelism();
+ }
+
+ @Override
+ public SlotSharingGroup getSlotSharingGroup() {
+ return jobVertex.getSlotSharingGroup();
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/ParallelismAndResourceAssignments.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/ParallelismAndResourceAssignments.java
new file mode 100644
index 0000000..f2527ae
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/ParallelismAndResourceAssignments.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+
+/** Assignment of slots to execution vertices. */
+public final class ParallelismAndResourceAssignments {
+ private final Map<ExecutionVertexID, ? extends LogicalSlot> assignedSlots;
+
+ private final Map<JobVertexID, Integer> parallelismPerJobVertex;
+
+ public ParallelismAndResourceAssignments(
+ Map<ExecutionVertexID, ? extends LogicalSlot> assignedSlots,
+ Map<JobVertexID, Integer> parallelismPerJobVertex) {
+ this.assignedSlots = assignedSlots;
+ this.parallelismPerJobVertex = parallelismPerJobVertex;
+ }
+
+ public int getParallelism(JobVertexID jobVertexId) {
+ Preconditions.checkState(parallelismPerJobVertex.containsKey(jobVertexId));
+ return parallelismPerJobVertex.get(jobVertexId);
+ }
+
+ public LogicalSlot getAssignedSlot(ExecutionVertexID executionVertexId) {
+ Preconditions.checkState(assignedSlots.containsKey(executionVertexId));
+ return assignedSlots.get(executionVertexId);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java
index f9c9015..a44fe2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java
@@ -24,6 +24,14 @@ import java.util.Collection;
/** Information about the job. */
public interface JobInformation {
+ /**
+ * Returns all slot-sharing groups of the job.
+ *
+ * <p>Attention: The returned slot sharing groups should never be modified (they are indeed
+ * mutable)!
+ *
+ * @return all slot-sharing groups of the job
+ */
Collection<SlotSharingGroup> getSlotSharingGroups();
VertexInformation getVertexInformation(JobVertexID jobVertexId);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
index 5b41bad..2dce418 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
@@ -39,9 +39,9 @@ import java.util.Map;
*
* <p>The release process of a shared slot follows one of 2 code paths:
*
- * <p>1)During normal execution all allocated logical slots will be returned, with the last return
+ * <p>1) During normal execution all allocated logical slots will be returned, with the last return
* triggering the {@code externalReleaseCallback} which must eventually result in a {@link
- * #release(Throwable)} call. 2)
+ * #release(Throwable)} call.
*
* <p>2) If the backing physical is lost (e.g., because the providing TaskManager crashed) then
* {@link #release(Throwable)} is called without all logical slots having been returned. The runtime
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
index 2ef0e47..810666a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
@@ -21,12 +21,15 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import org.apache.flink.runtime.scheduler.declarative.DeclarativeSchedulerFactory;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
@@ -65,7 +68,18 @@ public class SchedulerNGFactoryFactoryTest extends TestLogger {
}
}
+ @Test
+ public void fallBackToNonDeclarativeSchedulerForBatchJobsIfDeclarativeIsConfigured() {
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Declarative);
+
+ final SchedulerNGFactory schedulerNGFactory =
+ SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration, JobType.BATCH);
+
+ assertThat(schedulerNGFactory, is(not(instanceOf(DeclarativeSchedulerFactory.class))));
+ }
+
private static SchedulerNGFactory createSchedulerNGFactory(final Configuration configuration) {
- return SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
+ return SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration, JobType.BATCH);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index d0d25bc..3270119 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -292,7 +292,8 @@ public class JobMasterTest extends TestLogger {
JobMasterConfiguration.fromConfiguration(configuration);
final SchedulerNGFactory schedulerNGFactory =
- SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
+ SchedulerNGFactoryFactory.createSchedulerNGFactory(
+ configuration, jobGraph.getJobType());
final JobMaster jobMaster =
new JobMaster(
@@ -302,7 +303,8 @@ public class JobMasterTest extends TestLogger {
jmResourceId,
jobGraph,
haServices,
- SlotPoolServiceFactory.fromConfiguration(configuration),
+ SlotPoolServiceFactory.fromConfiguration(
+ configuration, jobGraph.getJobType()),
jobManagerSharedServices,
heartbeatServices,
UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
index d97778e..bb46435 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
@@ -553,7 +553,7 @@ public class DefaultDeclarativeSlotPoolTest extends TestLogger {
}
@Nonnull
- private static Collection<SlotOffer> createSlotOffersForResourceRequirements(
+ public static Collection<SlotOffer> createSlotOffersForResourceRequirements(
ResourceCounter resourceRequirements) {
Collection<SlotOffer> slotOffers = new ArrayList<>();
int slotIndex = 0;
@@ -608,7 +608,7 @@ public class DefaultDeclarativeSlotPoolTest extends TestLogger {
}
@Nonnull
- static Collection<SlotOffer> offerSlots(
+ public static Collection<SlotOffer> offerSlots(
DeclarativeSlotPool slotPool, Collection<SlotOffer> slotOffers) {
return slotPool.offerSlots(
slotOffers, new LocalTaskManagerLocation(), createTaskManagerGateway(null), 0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
index 26bbc09..c5c10ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
@@ -195,7 +195,8 @@ public class JobMasterBuilder {
highAvailabilityServices,
slotPoolFactory != null
? slotPoolFactory
- : SlotPoolServiceFactory.fromConfiguration(configuration),
+ : SlotPoolServiceFactory.fromConfiguration(
+ configuration, jobGraph.getJobType()),
jobManagerSharedServices,
heartbeatServices,
UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
@@ -204,7 +205,8 @@ public class JobMasterBuilder {
JobMasterBuilder.class.getClassLoader(),
schedulerFactory != null
? schedulerFactory
- : SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration),
+ : SchedulerNGFactoryFactory.createSchedulerNGFactory(
+ configuration, jobGraph.getJobType()),
shuffleMaster,
partitionTrackerFactory,
executionDeploymentTracker,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerBuilder.java
new file mode 100644
index 0000000..192db57
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerBuilder.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+/** Builder for {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerBuilder {
+ private static final Time DEFAULT_TIMEOUT = Time.seconds(300);
+
+ private final JobGraph jobGraph;
+
+ private final ComponentMainThreadExecutor mainThreadExecutor;
+
+ private Executor ioExecutor = TestingUtils.defaultExecutor();
+ private Configuration jobMasterConfiguration = new Configuration();
+ private ScheduledExecutorService futureExecutor = TestingUtils.defaultExecutor();
+ private ClassLoader userCodeLoader = ClassLoader.getSystemClassLoader();
+ private CheckpointRecoveryFactory checkpointRecoveryFactory =
+ new StandaloneCheckpointRecoveryFactory();
+ private DeclarativeSlotPool declarativeSlotPool;
+ private Time rpcTimeout = DEFAULT_TIMEOUT;
+ private BlobWriter blobWriter = VoidBlobWriter.getInstance();
+ private JobManagerJobMetricGroup jobManagerJobMetricGroup =
+ UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
+ private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE;
+ private JobMasterPartitionTracker partitionTracker = NoOpJobMasterPartitionTracker.INSTANCE;
+ private RestartBackoffTimeStrategy restartBackoffTimeStrategy =
+ NoRestartBackoffTimeStrategy.INSTANCE;
+ private FatalErrorHandler fatalErrorHandler =
+ error ->
+ FatalExitExceptionHandler.INSTANCE.uncaughtException(
+ Thread.currentThread(), error);
+ private JobStatusListener jobStatusListener = (ignoredA, ignoredB, ignoredC, ignoredD) -> {};
+
+ public DeclarativeSchedulerBuilder(
+ final JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor) {
+ this.jobGraph = jobGraph;
+ this.mainThreadExecutor = mainThreadExecutor;
+
+ this.declarativeSlotPool =
+ new DefaultDeclarativeSlotPool(
+ jobGraph.getJobID(),
+ new DefaultAllocatedSlotPool(),
+ ignored -> {},
+ DEFAULT_TIMEOUT,
+ rpcTimeout);
+ }
+
+ public DeclarativeSchedulerBuilder setIoExecutor(final Executor ioExecutor) {
+ this.ioExecutor = ioExecutor;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setJobMasterConfiguration(
+ final Configuration jobMasterConfiguration) {
+ this.jobMasterConfiguration = jobMasterConfiguration;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setFutureExecutor(
+ final ScheduledExecutorService futureExecutor) {
+ this.futureExecutor = futureExecutor;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setUserCodeLoader(final ClassLoader userCodeLoader) {
+ this.userCodeLoader = userCodeLoader;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setCheckpointRecoveryFactory(
+ final CheckpointRecoveryFactory checkpointRecoveryFactory) {
+ this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setRpcTimeout(final Time rpcTimeout) {
+ this.rpcTimeout = rpcTimeout;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setBlobWriter(final BlobWriter blobWriter) {
+ this.blobWriter = blobWriter;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setJobManagerJobMetricGroup(
+ final JobManagerJobMetricGroup jobManagerJobMetricGroup) {
+ this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setShuffleMaster(final ShuffleMaster<?> shuffleMaster) {
+ this.shuffleMaster = shuffleMaster;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setPartitionTracker(
+ final JobMasterPartitionTracker partitionTracker) {
+ this.partitionTracker = partitionTracker;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setDeclarativeSlotPool(
+ DeclarativeSlotPool declarativeSlotPool) {
+ this.declarativeSlotPool = declarativeSlotPool;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setRestartBackoffTimeStrategy(
+ final RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+ this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setFatalErrorHandler(FatalErrorHandler fatalErrorHandler) {
+ this.fatalErrorHandler = fatalErrorHandler;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setJobStatusListener(JobStatusListener jobStatusListener) {
+ this.jobStatusListener = jobStatusListener;
+ return this;
+ }
+
+ public DeclarativeScheduler build() throws Exception {
+ return new DeclarativeScheduler(
+ jobGraph,
+ jobMasterConfiguration,
+ declarativeSlotPool,
+ futureExecutor,
+ ioExecutor,
+ userCodeLoader,
+ checkpointRecoveryFactory,
+ rpcTimeout,
+ blobWriter,
+ jobManagerJobMetricGroup,
+ shuffleMaster,
+ partitionTracker,
+ restartBackoffTimeStrategy,
+ new DefaultExecutionDeploymentTracker(),
+ System.currentTimeMillis(),
+ mainThreadExecutor,
+ fatalErrorHandler,
+ jobStatusListener);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerClusterITCase.java
new file mode 100644
index 0000000..f7e528d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerClusterITCase.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.OnceBlockingNoOpInvokable;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * This class contains integration tests for the declarative scheduler which start a {@link
+ * org.apache.flink.runtime.minicluster.MiniCluster} per test case.
+ */
+public class DeclarativeSchedulerClusterITCase extends TestLogger {
+
+ private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+ private static final int NUMBER_TASK_MANAGERS = 2;
+ private static final int PARALLELISM = NUMBER_SLOTS_PER_TASK_MANAGER * NUMBER_TASK_MANAGERS;
+
+ private final Configuration configuration = createConfiguration();
+
+ @Rule
+ public final MiniClusterResource miniClusterResource =
+ new MiniClusterResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(configuration)
+ .setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+ .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+ .build());
+
+ private Configuration createConfiguration() {
+ final Configuration configuration = new Configuration();
+
+ configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Declarative);
+ configuration.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+ return configuration;
+ }
+
+ @Test
+ public void testAutomaticScaleDownInCaseOfLostSlots() throws InterruptedException, IOException {
+ assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+ final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+ final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM);
+
+ miniCluster.submitJob(jobGraph).join();
+ final CompletableFuture<JobResult> resultFuture =
+ miniCluster.requestJobResult(jobGraph.getJobID());
+
+ OnceBlockingNoOpInvokable.waitUntilOpsAreRunning();
+
+ miniCluster.terminateTaskManager(0);
+
+ final JobResult jobResult = resultFuture.join();
+
+ assertTrue(jobResult.isSuccess());
+ }
+
+ @Test
+ public void testAutomaticScaleUp() throws Exception {
+ assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+ final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+ int targetInstanceCount = NUMBER_SLOTS_PER_TASK_MANAGER * (NUMBER_TASK_MANAGERS + 1);
+ final JobGraph jobGraph = createBlockingJobGraph(targetInstanceCount);
+
+ // initially only expect NUMBER_TASK_MANAGERS
+ OnceBlockingNoOpInvokable.resetFor(NUMBER_SLOTS_PER_TASK_MANAGER * NUMBER_TASK_MANAGERS);
+
+ log.info(
+ "Submitting job with parallelism of "
+ + targetInstanceCount
+ + ", to a cluster with only one TM.");
+ miniCluster.submitJob(jobGraph).join();
+ final CompletableFuture<JobResult> jobResultFuture =
+ miniCluster.requestJobResult(jobGraph.getJobID());
+
+ OnceBlockingNoOpInvokable.waitUntilOpsAreRunning();
+
+ log.info("Start additional TaskManager to scale up to the full parallelism.");
+ OnceBlockingNoOpInvokable.resetInstanceCount(); // we expect a restart
+ OnceBlockingNoOpInvokable.resetFor(targetInstanceCount);
+ miniCluster.startTaskManager();
+
+ log.info("Waiting until Invokable is running with higher parallelism");
+ OnceBlockingNoOpInvokable.waitUntilOpsAreRunning();
+
+ assertEquals(targetInstanceCount, OnceBlockingNoOpInvokable.getInstanceCount());
+
+ assertTrue(jobResultFuture.join().isSuccess());
+ }
+
+ private JobGraph createBlockingJobGraph(int parallelism) throws IOException {
+ final JobVertex blockingOperator = new JobVertex("Blocking operator");
+
+ OnceBlockingNoOpInvokable.resetFor(parallelism);
+ blockingOperator.setInvokableClass(OnceBlockingNoOpInvokable.class);
+
+ blockingOperator.setParallelism(parallelism);
+
+ final JobGraph jobGraph = new JobGraph("Blocking job.", blockingOperator);
+ jobGraph.setJobType(JobType.STREAMING);
+
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
+ jobGraph.setExecutionConfig(executionConfig);
+
+ return jobGraph;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSimpleITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSimpleITCase.java
new file mode 100644
index 0000000..290ae9c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSimpleITCase.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/** Integration tests for the declarative scheduler. */
+public class DeclarativeSchedulerSimpleITCase extends TestLogger {
+
+ private static final int NUMBER_TASK_MANAGERS = 2;
+ private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+ private static final int PARALLELISM = 10;
+
+ private static final Configuration configuration = getConfiguration();
+
+ private static Configuration getConfiguration() {
+ final Configuration configuration = new Configuration();
+
+ configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Declarative);
+ configuration.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+ return configuration;
+ }
+
+ @ClassRule
+ public static final MiniClusterResource MINI_CLUSTER_RESOURCE =
+ new MiniClusterResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(configuration)
+ .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+ .setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+ .build());
+
+ @Test
+ public void testSchedulingOfSimpleJob() throws Exception {
+ assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+ final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
+ final JobGraph jobGraph = createJobGraph();
+
+ miniCluster.submitJob(jobGraph).join();
+
+ final JobResult jobResult = miniCluster.requestJobResult(jobGraph.getJobID()).join();
+
+ final JobExecutionResult jobExecutionResult =
+ jobResult.toJobExecutionResult(getClass().getClassLoader());
+
+ assertTrue(jobResult.isSuccess());
+ }
+
+ private JobGraph createJobGraph() {
+ final JobVertex source = new JobVertex("Source");
+ source.setInvokableClass(NoOpInvokable.class);
+ source.setParallelism(PARALLELISM);
+
+ final JobVertex sink = new JobVertex("sink");
+ sink.setInvokableClass(NoOpInvokable.class);
+ sink.setParallelism(PARALLELISM);
+
+ sink.connectNewDataSetAsInput(
+ source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+ return new JobGraph("Simple job", source, sink);
+ }
+
+ @Test
+ public void testGlobalFailoverIfTaskFails() throws IOException {
+ assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+ final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
+ final JobGraph jobGraph = createOnceFailingJobGraph();
+
+ miniCluster.submitJob(jobGraph).join();
+
+ final JobResult jobResult = miniCluster.requestJobResult(jobGraph.getJobID()).join();
+
+ assertTrue(jobResult.isSuccess());
+ }
+
+ private JobGraph createOnceFailingJobGraph() throws IOException {
+ final JobVertex onceFailingOperator = new JobVertex("Once failing operator");
+
+ OnceFailingInvokable.reset();
+ onceFailingOperator.setInvokableClass(OnceFailingInvokable.class);
+
+ onceFailingOperator.setParallelism(1);
+ final JobGraph jobGraph = new JobGraph("Once failing job", onceFailingOperator);
+ jobGraph.setJobType(JobType.STREAMING);
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
+ jobGraph.setExecutionConfig(executionConfig);
+ return jobGraph;
+ }
+
+ /** Once failing {@link AbstractInvokable}. */
+ public static final class OnceFailingInvokable extends AbstractInvokable {
+ private static volatile boolean hasFailed = false;
+
+ /**
+ * Create an Invokable task and set its environment.
+ *
+ * @param environment The environment assigned to this invokable.
+ */
+ public OnceFailingInvokable(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ if (!hasFailed && getIndexInSubtaskGroup() == 0) {
+ hasFailed = true;
+ throw new FlinkRuntimeException("Test failure.");
+ }
+ }
+
+ private static void reset() {
+ hasFailed = false;
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSlotSharingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSlotSharingITCase.java
new file mode 100644
index 0000000..06b993b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSlotSharingITCase.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/** SlotSharing tests for the declarative scheduler. */
+public class DeclarativeSchedulerSlotSharingITCase extends TestLogger {
+
+ private static final int NUMBER_TASK_MANAGERS = 1;
+ private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 1;
+ private static final int PARALLELISM = 10;
+
+ private static Configuration getConfiguration() {
+ final Configuration configuration = new Configuration();
+
+ configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Declarative);
+ configuration.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+ return configuration;
+ }
+
+ @ClassRule
+ public static final MiniClusterResource MINI_CLUSTER_RESOURCE =
+ new MiniClusterResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfiguration())
+ .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+ .setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+ .build());
+
+ @Test
+ public void testSchedulingOfJobRequiringSlotSharing() throws Exception {
+ // run job multiple times to ensure slots are cleaned up properly
+ runJob();
+ runJob();
+ }
+
+ private void runJob() throws Exception {
+ final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
+ final JobGraph jobGraph = createJobGraph();
+
+ miniCluster.submitJob(jobGraph).join();
+
+ final JobResult jobResult = miniCluster.requestJobResult(jobGraph.getJobID()).join();
+
+ // this throws an exception if the job failed
+ jobResult.toJobExecutionResult(getClass().getClassLoader());
+
+ assertTrue(jobResult.isSuccess());
+ }
+
+ /**
+ * Returns a JobGraph that requires slot sharing to work in order to be able to run with a
+ * single slot.
+ */
+ private static JobGraph createJobGraph() {
+ final SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
+
+ final JobVertex source = new JobVertex("Source");
+ source.setInvokableClass(NoOpInvokable.class);
+ source.setParallelism(PARALLELISM);
+ source.setSlotSharingGroup(slotSharingGroup);
+
+ final JobVertex sink = new JobVertex("sink");
+ sink.setInvokableClass(NoOpInvokable.class);
+ sink.setParallelism(PARALLELISM);
+ sink.setSlotSharingGroup(slotSharingGroup);
+
+ sink.connectNewDataSetAsInput(
+ source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+ final JobGraph jobGraph = new JobGraph("Simple job", source, sink);
+ jobGraph.setJobType(JobType.STREAMING);
+
+ return jobGraph;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
new file mode 100644
index 0000000..44dc879
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
+import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.offerSlots;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerTest extends TestLogger {
+
+ private static final int PARALLELISM = 4;
+ private static final JobVertex JOB_VERTEX;
+
+ static {
+ JOB_VERTEX = new JobVertex("v1");
+ JOB_VERTEX.setParallelism(PARALLELISM);
+ JOB_VERTEX.setInvokableClass(AbstractInvokable.class);
+ }
+
+ private final ManuallyTriggeredComponentMainThreadExecutor mainThreadExecutor =
+ new ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
+
+ @Test
+ public void testInitialState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+ assertThat(scheduler.getState(), instanceOf(Created.class));
+ }
+
+ @Test
+ public void testIsState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+ final State state = scheduler.getState();
+
+ assertThat(scheduler.isState(state), is(true));
+ assertThat(scheduler.isState(new DummyState()), is(false));
+ }
+
+ @Test
+ public void testRunIfState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+ AtomicBoolean ran = new AtomicBoolean(false);
+ scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
+ assertThat(ran.get(), is(true));
+ }
+
+ @Test
+ public void testRunIfStateWithStateMismatch() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+ AtomicBoolean ran = new AtomicBoolean(false);
+ scheduler.runIfState(new DummyState(), () -> ran.set(true));
+ assertThat(ran.get(), is(false));
+ }
+
+ @Test
+ public void testHasEnoughResourcesReturnsFalseIfUnsatisfied() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+ scheduler.startScheduling();
+
+ final ResourceCounter resourceRequirement =
+ ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
+
+ assertThat(scheduler.hasEnoughResources(resourceRequirement), is(false));
+ }
+
+ @Test
+ public void testHasEnoughResourcesReturnsTrueIfSatisfied() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ new DefaultDeclarativeSlotPool(
+ jobGraph.getJobID(),
+ new DefaultAllocatedSlotPool(),
+ ignored -> {},
+ Time.minutes(10),
+ Time.minutes(10));
+
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ scheduler.startScheduling();
+
+ final ResourceCounter resourceRequirement =
+ ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
+
+ offerSlots(
+ declarativeSlotPool, createSlotOffersForResourceRequirements(resourceRequirement));
+
+ assertThat(scheduler.hasEnoughResources(resourceRequirement), is(true));
+ }
+
+ @Test
+ public void testHasEnoughResourcesUsesUnmatchedSlotsAsUnknown() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ new DefaultDeclarativeSlotPool(
+ jobGraph.getJobID(),
+ new DefaultAllocatedSlotPool(),
+ ignored -> {},
+ Time.minutes(10),
+ Time.minutes(10));
+
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ scheduler.startScheduling();
+
+ final int numRequiredSlots = 1;
+ final ResourceCounter requiredResources =
+ ResourceCounter.withResource(ResourceProfile.UNKNOWN, numRequiredSlots);
+ final ResourceCounter providedResources =
+ ResourceCounter.withResource(
+ ResourceProfile.newBuilder().setCpuCores(1).build(), numRequiredSlots);
+
+ offerSlots(declarativeSlotPool, createSlotOffersForResourceRequirements(providedResources));
+
+ assertThat(scheduler.hasEnoughResources(requiredResources), is(true));
+ }
+
+ @Test
+ public void testExecutionGraphGenerationWithAvailableResources() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ new DefaultDeclarativeSlotPool(
+ jobGraph.getJobID(),
+ new DefaultAllocatedSlotPool(),
+ ignored -> {},
+ Time.minutes(10),
+ Time.minutes(10));
+
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ scheduler.startScheduling();
+
+ final int numAvailableSlots = 1;
+
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(ResourceProfile.UNKNOWN, numAvailableSlots)));
+
+ final ExecutionGraph executionGraph =
+ scheduler.createExecutionGraphWithAvailableResources();
+
+ assertThat(
+ executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism(),
+ is(numAvailableSlots));
+ }
+
+ @Test
+ public void testFatalErrorsForwardedToFatalErrorHandler() throws Exception {
+ final TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
+
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor)
+ .setFatalErrorHandler(fatalErrorHandler)
+ .build();
+
+ final RuntimeException exception = new RuntimeException();
+
+ scheduler.runIfState(
+ scheduler.getState(),
+ () -> {
+ throw exception;
+ });
+
+ assertThat(fatalErrorHandler.getException(), is(exception));
+ }
+
+ // ---------------------------------------------------------------------------------------------
+ // State transition tests
+ // ---------------------------------------------------------------------------------------------
+
+ @Test
+ public void testStartSchedulingTransitionsToWaitingForResources() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+ scheduler.startScheduling();
+
+ assertThat(scheduler.getState(), instanceOf(WaitingForResources.class));
+ }
+
+ @Test
+ public void testStartSchedulingSetsResourceRequirements() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ new DefaultDeclarativeSlotPool(
+ jobGraph.getJobID(),
+ new DefaultAllocatedSlotPool(),
+ ignored -> {},
+ Time.minutes(10),
+ Time.minutes(10));
+
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ scheduler.startScheduling();
+
+ assertThat(
+ declarativeSlotPool.getResourceRequirements(),
+ contains(ResourceRequirement.create(ResourceProfile.UNKNOWN, PARALLELISM)));
+ }
+
+ /** Tests that the listener for new slots is properly set up. */
+ @Test
+ public void testResourceAcquisitionTriggersJobExecution() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ new DefaultDeclarativeSlotPool(
+ jobGraph.getJobID(),
+ new DefaultAllocatedSlotPool(),
+ ignored -> {},
+ Time.minutes(10),
+ Time.minutes(10));
+
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ scheduler.startScheduling();
+
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(ResourceProfile.UNKNOWN, PARALLELISM)));
+
+ assertThat(scheduler.getState(), instanceOf(Executing.class));
+ }
+
+ @Test
+ public void testGoToFinished() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+ final ArchivedExecutionGraph archivedExecutionGraph =
+ new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build();
+
+ scheduler.goToFinished(archivedExecutionGraph);
+
+ assertThat(scheduler.getState(), instanceOf(Finished.class));
+ }
+
+ @Test
+ public void testGoToFinishedNotifiesJobListener() throws Exception {
+ final AtomicReference<JobStatus> jobStatusUpdate = new AtomicReference<>();
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor)
+ .setJobStatusListener(
+ (jobId, newJobStatus, timestamp, error) ->
+ jobStatusUpdate.set(newJobStatus))
+ .build();
+
+ final ArchivedExecutionGraph archivedExecutionGraph =
+ new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build();
+
+ scheduler.goToFinished(archivedExecutionGraph);
+
+ assertThat(jobStatusUpdate.get(), is(archivedExecutionGraph.getState()));
+ }
+
+ @Test
+ public void testGoToFinishedShutsDownCheckpointingComponents() throws Exception {
+ final CompletableFuture<JobStatus> completedCheckpointStoreShutdownFuture =
+ new CompletableFuture<>();
+ final CompletedCheckpointStore completedCheckpointStore =
+ new TestingCompletedCheckpointStore(completedCheckpointStoreShutdownFuture);
+
+ final CompletableFuture<JobStatus> checkpointIdCounterShutdownFuture =
+ new CompletableFuture<>();
+ final CheckpointIDCounter checkpointIdCounter =
+ new TestingCheckpointIDCounter(checkpointIdCounterShutdownFuture);
+
+ final JobGraph jobGraph = createJobGraph();
+ // checkpointing components are only created if checkpointing is enabled
+ jobGraph.setSnapshotSettings(
+ new JobCheckpointingSettings(
+ CheckpointCoordinatorConfiguration.builder().build(), null));
+
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+ .setCheckpointRecoveryFactory(
+ new TestingCheckpointRecoveryFactory(
+ completedCheckpointStore, checkpointIdCounter))
+ .build();
+
+ final ArchivedExecutionGraph archivedExecutionGraph =
+ new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build();
+
+ scheduler.goToFinished(archivedExecutionGraph);
+
+ assertThat(completedCheckpointStoreShutdownFuture.get(), is(JobStatus.FAILED));
+ assertThat(checkpointIdCounterShutdownFuture.get(), is(JobStatus.FAILED));
+ }
+
+ @Test
+ public void testTransitionToStateCallsOnEnter() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+ final LifecycleMethodCapturingState firstState = new LifecycleMethodCapturingState();
+
+ scheduler.transitionToState(firstState);
+ assertThat(firstState.onEnterCalled, is(true));
+ assertThat(firstState.onLeaveCalled, is(false));
+ firstState.reset();
+ }
+
+ @Test
+ public void testTransitionToStateCallsOnLeave() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+ final LifecycleMethodCapturingState firstState = new LifecycleMethodCapturingState();
+ final DummyState secondState = new DummyState();
+
+ scheduler.transitionToState(firstState);
+ firstState.reset();
+
+ scheduler.transitionToState(secondState);
+ assertThat(firstState.onEnterCalled, is(false));
+ assertThat(firstState.onLeaveCalled, is(true));
+ assertThat(firstState.onLeaveNewStateArgument, sameInstance(secondState.getClass()));
+ }
+
+ @Test
+ public void testTransitionToStateIgnoresDuplicateTransitions() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+ final LifecycleMethodCapturingState state = new LifecycleMethodCapturingState();
+ scheduler.transitionToState(state);
+ state.reset();
+
+ // attempt to transition into the state we are already in
+ scheduler.transitionToState(state);
+
+ assertThat(state.onEnterCalled, is(false));
+ assertThat(state.onLeaveCalled, is(false));
+ }
+
+ // ---------------------------------------------------------------------------------------------
+ // Failure handling tests
+ // ---------------------------------------------------------------------------------------------
+
+ @Test
+ public void testHowToHandleFailureRejectedByStrategy() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor)
+ .setRestartBackoffTimeStrategy(NoRestartBackoffTimeStrategy.INSTANCE)
+ .build();
+
+ assertThat(scheduler.howToHandleFailure(new Exception("test")).canRestart(), is(false));
+ }
+
+ @Test
+ public void testHowToHandleFailureAllowedByStrategy() throws Exception {
+ final TestRestartBackoffTimeStrategy restartBackoffTimeStrategy =
+ new TestRestartBackoffTimeStrategy(true, 1234);
+
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor)
+ .setRestartBackoffTimeStrategy(restartBackoffTimeStrategy)
+ .build();
+
+ final Executing.FailureResult failureResult =
+ scheduler.howToHandleFailure(new Exception("test"));
+
+ assertThat(failureResult.canRestart(), is(true));
+ assertThat(
+ failureResult.getBackoffTime().toMillis(),
+ is(restartBackoffTimeStrategy.getBackoffTime()));
+ }
+
+ @Test
+ public void testHowToHandleFailureUnrecoverableFailure() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+ assertThat(
+ scheduler
+ .howToHandleFailure(new SuppressRestartsException(new Exception("test")))
+ .canRestart(),
+ is(false));
+ }
+
+ // ---------------------------------------------------------------------------------------------
+ // Illegal state behavior tests
+ // ---------------------------------------------------------------------------------------------
+
+ @Test
+ public void testTriggerSavepointFailsInIllegalState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+ assertThat(
+ scheduler.triggerSavepoint("some directory", false),
+ futureFailedWith(CheckpointException.class));
+ }
+
+ @Test
+ public void testStopWithSavepointFailsInIllegalState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+ assertThat(
+ scheduler.stopWithSavepoint("some directory", false),
+ futureFailedWith(CheckpointException.class));
+ }
+
+ @Test(expected = TaskNotRunningException.class)
+ public void testDeliverOperatorEventToCoordinatorFailsInIllegalState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+ scheduler.deliverOperatorEventToCoordinator(
+ new ExecutionAttemptID(), new OperatorID(), new TestOperatorEvent());
+ }
+
+ @Test
+ public void testDeliverCoordinationRequestToCoordinatorFailsInIllegalState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+ assertThat(
+ scheduler.deliverCoordinationRequestToCoordinator(
+ new OperatorID(), new CoordinationRequest() {}),
+ futureFailedWith(FlinkException.class));
+ }
+
+ @Test
+ public void testUpdateTaskExecutionStateReturnsFalseInIllegalState() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor).build();
+
+ assertThat(
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionStateTransition(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ new ExecutionAttemptID(),
+ ExecutionState.FAILED))),
+ is(false));
+ }
+
+ @Test(expected = IOException.class)
+ public void testRequestNextInputSplitFailsInIllegalState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+ scheduler.requestNextInputSplit(JOB_VERTEX.getID(), new ExecutionAttemptID());
+ }
+
+ @Test(expected = PartitionProducerDisposedException.class)
+ public void testRequestPartitionStateFailsInIllegalState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+ scheduler.requestPartitionState(new IntermediateDataSetID(), new ResultPartitionID());
+ }
+
+ // ---------------------------------------------------------------------------------------------
+ // Utils
+ // ---------------------------------------------------------------------------------------------
+
+ private static JobGraph createJobGraph() {
+ final JobGraph jobGraph = new JobGraph(JOB_VERTEX);
+ jobGraph.setJobType(JobType.STREAMING);
+ return jobGraph;
+ }
+
+ private static class LifecycleMethodCapturingState extends DummyState {
+ boolean onEnterCalled = false;
+ boolean onLeaveCalled = false;
+ @Nullable Class<? extends State> onLeaveNewStateArgument = null;
+
+ void reset() {
+ onEnterCalled = false;
+ onLeaveCalled = false;
+ onLeaveNewStateArgument = null;
+ }
+
+ @Override
+ public void onEnter() {
+ onEnterCalled = true;
+ }
+
+ @Override
+ public void onLeave(Class<? extends State> newState) {
+ onLeaveCalled = true;
+ onLeaveNewStateArgument = newState;
+ }
+ }
+
+ private static class DummyState implements State {
+
+ @Override
+ public void cancel() {}
+
+ @Override
+ public void suspend(Throwable cause) {}
+
+ @Override
+ public JobStatus getJobStatus() {
+ return null;
+ }
+
+ @Override
+ public ArchivedExecutionGraph getJob() {
+ return null;
+ }
+
+ @Override
+ public void handleGlobalFailure(Throwable cause) {}
+
+ @Override
+ public Logger getLogger() {
+ return null;
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
index bf83169..621afa5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
@@ -36,6 +36,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
/** Tests for the {@link SharedSlot}. */
public class SharedSlotTest extends TestLogger {
@@ -194,6 +195,16 @@ public class SharedSlotTest extends TestLogger {
}));
sharedSlot.release(new Exception("test"));
+
+ // if all logical slots were released, and the sharedSlot no longer allows the allocation of
+ // logical slots, then the slot release was completed
+ assertThat(logicalSlot1.isAlive(), is(false));
+ assertThat(logicalSlot2.isAlive(), is(false));
+ try {
+ sharedSlot.allocateLogicalSlot();
+ fail("Allocation of logical slot should have failed because the slot was released.");
+ } catch (IllegalStateException expected) {
+ }
}
@Test(expected = IllegalStateException.class)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java
new file mode 100644
index 0000000..c053104
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Simple {@link AbstractInvokable} which blocks the first time it is run. Moreover, one can wait
+ * until n instances of this invokable are running by calling {@link #waitUntilOpsAreRunning()}.
+ *
+ * <p>Before using this class it is important to call {@link #resetFor}.
+ */
+public class OnceBlockingNoOpInvokable extends AbstractInvokable {
+
+ private static final AtomicInteger instanceCount = new AtomicInteger(0);
+
+ private static volatile CountDownLatch numOpsPending = new CountDownLatch(1);
+
+ private static volatile boolean isBlocking = true;
+
+ private final Object lock = new Object();
+
+ private volatile boolean running = true;
+
+ public OnceBlockingNoOpInvokable(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+
+ instanceCount.incrementAndGet();
+ numOpsPending.countDown();
+
+ synchronized (lock) {
+ while (isBlocking && running) {
+ lock.wait();
+ }
+ }
+
+ isBlocking = false;
+ }
+
+ @Override
+ public void cancel() throws Exception {
+ synchronized (lock) {
+ running = false;
+ lock.notifyAll();
+ }
+ }
+
+ public static void waitUntilOpsAreRunning() throws InterruptedException {
+ numOpsPending.await();
+ }
+
+ public static int getInstanceCount() {
+ return instanceCount.get();
+ }
+
+ public static void resetInstanceCount() {
+ instanceCount.set(0);
+ }
+
+ public static void resetFor(int parallelism) {
+ numOpsPending = new CountDownLatch(parallelism);
+ isBlocking = true;
+ }
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java
new file mode 100644
index 0000000..23dacd4
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import static org.junit.Assume.assumeTrue;
+
+/** Integration tests for the declarative scheduler. */
+public class DeclarativeSchedulerITCase extends TestLogger {
+
+ private static final int NUMBER_TASK_MANAGERS = 2;
+ private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+ private static final int PARALLELISM = NUMBER_SLOTS_PER_TASK_MANAGER * NUMBER_TASK_MANAGERS;
+
+ private static final Configuration configuration = getConfiguration();
+
+ private static Configuration getConfiguration() {
+ final Configuration configuration = new Configuration();
+
+ configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Declarative);
+ configuration.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+ return configuration;
+ }
+
+ @ClassRule
+ public static final MiniClusterResource MINI_CLUSTER_WITH_CLIENT_RESOURCE =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfiguration())
+ .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+ .setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+ .build());
+
+ /** Tests that the declarative scheduler can recover stateful operators. */
+ @Test
+ public void testGlobalFailoverCanRecoverState() throws Exception {
+ assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+
+ env.enableCheckpointing(20L, CheckpointingMode.EXACTLY_ONCE);
+ final DataStreamSource<Integer> input = env.addSource(new SimpleSource());
+
+ input.addSink(new DiscardingSink<>());
+
+ env.execute();
+ }
+
+ /**
+ * Simple source which fails once after a successful checkpoint has been taken. Upon recovery
+ * the source will immediately terminate.
+ */
+ public static final class SimpleSource extends RichParallelSourceFunction<Integer>
+ implements CheckpointListener, CheckpointedFunction {
+
+ private static final ListStateDescriptor<Boolean> unionStateListDescriptor =
+ new ListStateDescriptor<>("state", Boolean.class);
+
+ private volatile boolean running = true;
+
+ @Nullable private ListState<Boolean> unionListState = null;
+
+ private boolean hasFailedBefore = false;
+
+ private boolean fail = false;
+
+ @Override
+ public void run(SourceContext<Integer> ctx) throws Exception {
+ while (running && !hasFailedBefore) {
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collect(getRuntimeContext().getIndexOfThisSubtask());
+
+ Thread.sleep(5L);
+ }
+
+ if (fail) {
+ throw new FlinkException("Test failure.");
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ fail = true;
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {}
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ unionListState =
+ context.getOperatorStateStore().getUnionListState(unionStateListDescriptor);
+
+ for (Boolean previousState : unionListState.get()) {
+ hasFailedBefore |= previousState;
+ }
+
+ unionListState.clear();
+ unionListState.add(true);
+ }
+ }
+}