You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/02/18 09:14:59 UTC

[flink] 11/12: [FLINK-21100][coordination] Add DeclarativeScheduler

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0efe64c4297d0f8b46ec22b4be88064d530bf488
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Feb 8 20:38:40 2021 +0100

    [FLINK-21100][coordination] Add DeclarativeScheduler
---
 .../flink/configuration/JobManagerOptions.java     |   8 +-
 .../dispatcher/DefaultJobManagerRunnerFactory.java |   5 +-
 .../dispatcher/SchedulerNGFactoryFactory.java      |  23 +-
 .../failover/flip1/ExecutionFailureHandler.java    |   4 +-
 .../jobmaster/slotpool/SlotPoolServiceFactory.java |   9 +-
 ...pdateSchedulerNgOnInternalFailuresListener.java |   2 +-
 .../declarative/DeclarativeScheduler.java          | 930 +++++++++++++++++++++
 .../declarative/DeclarativeSchedulerFactory.java   | 110 +++
 .../declarative/JobGraphJobInformation.java        | 101 +++
 .../ParallelismAndResourceAssignments.java         |  50 ++
 .../declarative/allocator/JobInformation.java      |   8 +
 .../declarative/allocator/SharedSlot.java          |   4 +-
 .../dispatcher/SchedulerNGFactoryFactoryTest.java  |  16 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |   6 +-
 .../slotpool/DefaultDeclarativeSlotPoolTest.java   |   4 +-
 .../runtime/jobmaster/utils/JobMasterBuilder.java  |   6 +-
 .../declarative/DeclarativeSchedulerBuilder.java   | 189 +++++
 .../DeclarativeSchedulerClusterITCase.java         | 147 ++++
 .../DeclarativeSchedulerSimpleITCase.java          | 163 ++++
 .../DeclarativeSchedulerSlotSharingITCase.java     | 113 +++
 .../declarative/DeclarativeSchedulerTest.java      | 617 ++++++++++++++
 .../declarative/allocator/SharedSlotTest.java      |  11 +
 .../testtasks/OnceBlockingNoOpInvokable.java       |  88 ++
 .../scheduling/DeclarativeSchedulerITCase.java     | 150 ++++
 24 files changed, 2744 insertions(+), 20 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 06da369..3e86c69 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -356,12 +356,16 @@ public class JobManagerOptions {
                             Description.builder()
                                     .text(
                                             "Determines which scheduler implementation is used to schedule tasks. Accepted values are:")
-                                    .list(text("'Ng': new generation scheduler"))
+                                    .list(
+                                            text("'Ng': new generation scheduler"),
+                                            text(
+                                                    "'Declarative': declarative scheduler; supports reactive mode"))
                                     .build());
 
     /** Type of scheduler implementation. */
     public enum SchedulerType {
-        Ng
+        Ng,
+        Declarative
     }
 
     @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
index b063a0f..839faeb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
@@ -57,9 +57,10 @@ public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
                 JobMasterConfiguration.fromConfiguration(configuration);
 
         final SlotPoolServiceFactory slotPoolFactory =
-                SlotPoolServiceFactory.fromConfiguration(configuration);
+                SlotPoolServiceFactory.fromConfiguration(configuration, jobGraph.getJobType());
         final SchedulerNGFactory schedulerNGFactory =
-                SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
+                SchedulerNGFactoryFactory.createSchedulerNGFactory(
+                        configuration, jobGraph.getJobType());
         final ShuffleMaster<?> shuffleMaster =
                 ShuffleServiceLoader.loadShuffleServiceFactory(configuration)
                         .createShuffleMaster(configuration);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
index 34eae71..a7e5b02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
@@ -21,20 +21,39 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
 import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import org.apache.flink.runtime.scheduler.declarative.DeclarativeSchedulerFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Factory for {@link SchedulerNGFactory}. */
 public final class SchedulerNGFactoryFactory {
 
+    private static final Logger LOG = LoggerFactory.getLogger(SchedulerNGFactoryFactory.class);
+
     private SchedulerNGFactoryFactory() {}
 
-    public static SchedulerNGFactory createSchedulerNGFactory(final Configuration configuration) {
-        final JobManagerOptions.SchedulerType schedulerType =
+    public static SchedulerNGFactory createSchedulerNGFactory(
+            final Configuration configuration, JobType jobType) {
+        JobManagerOptions.SchedulerType schedulerType =
                 configuration.get(JobManagerOptions.SCHEDULER);
+
+        if (schedulerType == JobManagerOptions.SchedulerType.Declarative
+                && jobType == JobType.BATCH) {
+            LOG.info(
+                    "Declarative Scheduler configured, but Batch job detected. Changing scheduler type to NG / DefaultScheduler.");
+            // overwrite
+            schedulerType = JobManagerOptions.SchedulerType.Ng;
+        }
+
         switch (schedulerType) {
             case Ng:
                 return new DefaultSchedulerFactory();
+            case Declarative:
+                return new DeclarativeSchedulerFactory();
 
             default:
                 throw new IllegalArgumentException(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
index 1bfe42c..6961b45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.runtime.executiongraph.failover.flip1;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
@@ -122,8 +121,7 @@ public class ExecutionFailureHandler {
         }
     }
 
-    @VisibleForTesting
-    static boolean isUnrecoverableError(Throwable cause) {
+    public static boolean isUnrecoverableError(Throwable cause) {
         Optional<Throwable> unrecoverableError =
                 ThrowableClassifier.findThrowableOfThrowableType(
                         cause, ThrowableType.NonRecoverableError);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
index 8a2f078..b486747 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.util.clock.SystemClock;
 
 import javax.annotation.Nonnull;
@@ -34,7 +35,7 @@ public interface SlotPoolServiceFactory {
     @Nonnull
     SlotPoolService createSlotPoolService(@Nonnull JobID jobId);
 
-    static SlotPoolServiceFactory fromConfiguration(Configuration configuration) {
+    static SlotPoolServiceFactory fromConfiguration(Configuration configuration, JobType jobType) {
         final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
         final Time slotIdleTimeout =
                 Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));
@@ -42,6 +43,12 @@ public interface SlotPoolServiceFactory {
                 Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
 
         if (ClusterOptions.isDeclarativeResourceManagementEnabled(configuration)) {
+            if (configuration.get(JobManagerOptions.SCHEDULER)
+                            == JobManagerOptions.SchedulerType.Declarative
+                    && jobType == JobType.STREAMING) {
+                return new DeclarativeSlotPoolServiceFactory(
+                        SystemClock.getInstance(), slotIdleTimeout, rpcTimeout);
+            }
 
             return new DeclarativeSlotPoolBridgeServiceFactory(
                     SystemClock.getInstance(), rpcTimeout, slotIdleTimeout, batchSlotTimeout);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalFailuresListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalFailuresListener.java
index aa3763a..b97620e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalFailuresListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalFailuresListener.java
@@ -31,7 +31,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Calls {@link SchedulerNG#updateTaskExecutionState(TaskExecutionStateTransition)} on task failure.
  * Calls {@link SchedulerNG#handleGlobalFailure(Throwable)} on global failures.
  */
-class UpdateSchedulerNgOnInternalFailuresListener implements InternalFailuresListener {
+public class UpdateSchedulerNgOnInternalFailuresListener implements InternalFailuresListener {
 
     private final SchedulerNG schedulerNg;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java
new file mode 100644
index 0000000..ac89434
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java
@@ -0,0 +1,930 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
+import org.apache.flink.runtime.scheduler.declarative.allocator.SlotAllocator;
+import org.apache.flink.runtime.scheduler.declarative.allocator.SlotSharingSlotAllocator;
+import org.apache.flink.runtime.scheduler.declarative.allocator.VertexParallelism;
+import org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ReactiveScaleUpController;
+import org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ScaleUpController;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link SchedulerNG} implementation that uses the declarative resource management and
+ * automatically adapts the parallelism in case not enough resource could be acquired to run at the
+ * configured parallelism, as described in FLIP-160.
+ *
+ * <p>This scheduler only supports jobs with streaming semantics, i.e., all vertices are connected
+ * via pipelined data-exchanges.
+ *
+ * <p>The implementation is spread over multiple {@link State} classes that control which RPCs are
+ * allowed in a given state and what state transitions are possible (see the FLIP for an overview).
+ * This class can thus be roughly split into 2 parts:
+ *
+ * <p>1) RPCs, which must forward the call to the state via {@link State#tryRun(Class,
+ * ThrowingConsumer, String)} or {@link State#tryCall(Class, FunctionWithException, String)}.
+ *
+ * <p>2) Context methods, which are called by states, to either transition into another state or
+ * access functionality of some component in the scheduler.
+ */
+public class DeclarativeScheduler
+        implements SchedulerNG,
+                Created.Context,
+                WaitingForResources.Context,
+                Executing.Context,
+                Restarting.Context,
+                Failing.Context,
+                Finished.Context {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DeclarativeScheduler.class);
+
+    private final JobGraphJobInformation jobInformation;
+
+    private final DeclarativeSlotPool declarativeSlotPool;
+
+    private final long initializationTimestamp;
+
+    private final Configuration configuration;
+    private final ScheduledExecutorService futureExecutor;
+    private final Executor ioExecutor;
+    private final ClassLoader userCodeClassLoader;
+    private final Time rpcTimeout;
+    private final BlobWriter blobWriter;
+    private final ShuffleMaster<?> shuffleMaster;
+    private final JobMasterPartitionTracker partitionTracker;
+    private final ExecutionDeploymentTracker executionDeploymentTracker;
+    private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
+
+    private final CompletedCheckpointStore completedCheckpointStore;
+    private final CheckpointIDCounter checkpointIdCounter;
+    private final CheckpointsCleaner checkpointsCleaner;
+
+    private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+    private final RestartBackoffTimeStrategy restartBackoffTimeStrategy;
+
+    private final ComponentMainThreadExecutor componentMainThreadExecutor;
+    private final FatalErrorHandler fatalErrorHandler;
+
+    private final JobStatusListener jobStatusListener;
+
+    private final SlotAllocator<?> slotAllocator;
+
+    private final ScaleUpController scaleUpController;
+
+    private State state = new Created(this, LOG);
+
+    public DeclarativeScheduler(
+            JobGraph jobGraph,
+            Configuration configuration,
+            DeclarativeSlotPool declarativeSlotPool,
+            ScheduledExecutorService futureExecutor,
+            Executor ioExecutor,
+            ClassLoader userCodeClassLoader,
+            CheckpointRecoveryFactory checkpointRecoveryFactory,
+            Time rpcTimeout,
+            BlobWriter blobWriter,
+            JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            ShuffleMaster<?> shuffleMaster,
+            JobMasterPartitionTracker partitionTracker,
+            RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+            ExecutionDeploymentTracker executionDeploymentTracker,
+            long initializationTimestamp,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            FatalErrorHandler fatalErrorHandler,
+            JobStatusListener jobStatusListener)
+            throws JobExecutionException {
+
+        ensureFullyPipelinedStreamingJob(jobGraph);
+
+        this.jobInformation = new JobGraphJobInformation(jobGraph);
+        this.declarativeSlotPool = declarativeSlotPool;
+        this.initializationTimestamp = initializationTimestamp;
+        this.configuration = configuration;
+        this.futureExecutor = futureExecutor;
+        this.ioExecutor = ioExecutor;
+        this.userCodeClassLoader = userCodeClassLoader;
+        this.rpcTimeout = rpcTimeout;
+        this.blobWriter = blobWriter;
+        this.shuffleMaster = shuffleMaster;
+        this.partitionTracker = partitionTracker;
+        this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+        this.executionDeploymentTracker = executionDeploymentTracker;
+        this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+        this.fatalErrorHandler = fatalErrorHandler;
+        this.completedCheckpointStore =
+                SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(
+                        jobGraph,
+                        configuration,
+                        userCodeClassLoader,
+                        checkpointRecoveryFactory,
+                        LOG);
+        this.checkpointIdCounter =
+                SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled(
+                        jobGraph, checkpointRecoveryFactory);
+        this.checkpointsCleaner = new CheckpointsCleaner();
+
+        this.slotAllocator =
+                new SlotSharingSlotAllocator(
+                        declarativeSlotPool::reserveFreeSlot,
+                        declarativeSlotPool::freeReservedSlot);
+
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) {
+                vertex.setParallelism(1);
+            }
+        }
+
+        declarativeSlotPool.registerNewSlotsListener(this::newResourcesAvailable);
+
+        this.componentMainThreadExecutor = mainThreadExecutor;
+        this.jobStatusListener = jobStatusListener;
+
+        this.scaleUpController = new ReactiveScaleUpController(configuration);
+    }
+
+    private static void ensureFullyPipelinedStreamingJob(JobGraph jobGraph)
+            throws RuntimeException {
+        Preconditions.checkState(
+                jobGraph.getJobType() == JobType.STREAMING,
+                "The declarative scheduler only supports streaming jobs.");
+        Preconditions.checkState(
+                jobGraph.getScheduleMode()
+                        != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+                "The declarative schedules does not support batch slot requests.");
+
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            for (JobEdge jobEdge : vertex.getInputs()) {
+                Preconditions.checkState(
+                        jobEdge.getSource().getResultType().isPipelined(),
+                        "The declarative scheduler supports pipelined data exchanges (violated by %s -> %s).",
+                        jobEdge.getSource().getProducer(),
+                        jobEdge.getTarget().getID());
+            }
+        }
+    }
+
+    private void newResourcesAvailable(Collection<? extends PhysicalSlot> physicalSlots) {
+        state.tryRun(
+                ResourceConsumer.class,
+                ResourceConsumer::notifyNewResourcesAvailable,
+                "newResourcesAvailable");
+    }
+
+    @Override
+    public void startScheduling() {
+        state.as(Created.class)
+                .orElseThrow(
+                        () ->
+                                new IllegalStateException(
+                                        "Can only start scheduling when being in Created state."))
+                .startScheduling();
+    }
+
+    @Override
+    public void suspend(Throwable cause) {
+        state.suspend(cause);
+    }
+
+    @Override
+    public void cancel() {
+        state.cancel();
+    }
+
+    @Override
+    public CompletableFuture<Void> getTerminationFuture() {
+        return terminationFuture;
+    }
+
+    @Override
+    public void handleGlobalFailure(Throwable cause) {
+        state.handleGlobalFailure(cause);
+    }
+
+    @Override
+    public boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState) {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                stateWithExecutionGraph.updateTaskExecutionState(
+                                        taskExecutionState),
+                        "updateTaskExecutionState")
+                .orElse(false);
+    }
+
+    @Override
+    public SerializedInputSplit requestNextInputSplit(
+            JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                stateWithExecutionGraph.requestNextInputSplit(
+                                        vertexID, executionAttempt),
+                        "requestNextInputSplit")
+                .orElseThrow(
+                        () -> new IOException("Scheduler is currently not executing the job."));
+    }
+
+    @Override
+    public ExecutionState requestPartitionState(
+            IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId)
+            throws PartitionProducerDisposedException {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                stateWithExecutionGraph.requestPartitionState(
+                                        intermediateResultId, resultPartitionId),
+                        "requestPartitionState")
+                .orElseThrow(() -> new PartitionProducerDisposedException(resultPartitionId));
+    }
+
+    @Override
+    public void notifyPartitionDataAvailable(ResultPartitionID partitionID) {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        stateWithExecutionGraph.notifyPartitionDataAvailable(partitionID),
+                "notifyPartitionDataAvailable");
+    }
+
+    @Override
+    public ArchivedExecutionGraph requestJob() {
+        return state.getJob();
+    }
+
+    @Override
+    public JobStatus requestJobStatus() {
+        return state.getJobStatus();
+    }
+
+    @Override
+    public JobDetails requestJobDetails() {
+        return JobDetails.createDetailsForJob(state.getJob());
+    }
+
+    @Override
+    public KvStateLocation requestKvStateLocation(JobID jobId, String registrationName)
+            throws UnknownKvStateLocation, FlinkJobNotFoundException {
+        final Optional<StateWithExecutionGraph> asOptional =
+                state.as(StateWithExecutionGraph.class);
+
+        if (asOptional.isPresent()) {
+            return asOptional.get().requestKvStateLocation(jobId, registrationName);
+        } else {
+            throw new UnknownKvStateLocation(registrationName);
+        }
+    }
+
+    @Override
+    public void notifyKvStateRegistered(
+            JobID jobId,
+            JobVertexID jobVertexId,
+            KeyGroupRange keyGroupRange,
+            String registrationName,
+            KvStateID kvStateId,
+            InetSocketAddress kvStateServerAddress)
+            throws FlinkJobNotFoundException {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        stateWithExecutionGraph.notifyKvStateRegistered(
+                                jobId,
+                                jobVertexId,
+                                keyGroupRange,
+                                registrationName,
+                                kvStateId,
+                                kvStateServerAddress),
+                "notifyKvStateRegistered");
+    }
+
+    @Override
+    public void notifyKvStateUnregistered(
+            JobID jobId,
+            JobVertexID jobVertexId,
+            KeyGroupRange keyGroupRange,
+            String registrationName)
+            throws FlinkJobNotFoundException {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        stateWithExecutionGraph.notifyKvStateUnregistered(
+                                jobId, jobVertexId, keyGroupRange, registrationName),
+                "notifyKvStateUnregistered");
+    }
+
+    @Override
+    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        stateWithExecutionGraph.updateAccumulators(accumulatorSnapshot),
+                "updateAccumulators");
+    }
+
+    @Override
+    public CompletableFuture<String> triggerSavepoint(
+            @Nullable String targetDirectory, boolean cancelJob) {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                stateWithExecutionGraph.triggerSavepoint(
+                                        targetDirectory, cancelJob),
+                        "triggerSavepoint")
+                .orElse(
+                        FutureUtils.completedExceptionally(
+                                new CheckpointException(
+                                        "The Flink job is currently not executing.",
+                                        CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE)));
+    }
+
+    @Override
+    public void acknowledgeCheckpoint(
+            JobID jobID,
+            ExecutionAttemptID executionAttemptID,
+            long checkpointId,
+            CheckpointMetrics checkpointMetrics,
+            TaskStateSnapshot checkpointState) {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        stateWithExecutionGraph.acknowledgeCheckpoint(
+                                jobID,
+                                executionAttemptID,
+                                checkpointId,
+                                checkpointMetrics,
+                                checkpointState),
+                "acknowledgeCheckpoint");
+    }
+
+    @Override
+    public void reportCheckpointMetrics(
+            JobID jobID,
+            ExecutionAttemptID executionAttemptID,
+            long checkpointId,
+            CheckpointMetrics checkpointMetrics) {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        stateWithExecutionGraph.reportCheckpointMetrics(
+                                executionAttemptID, checkpointId, checkpointMetrics),
+                "reportCheckpointMetrics");
+    }
+
+    @Override
+    public void declineCheckpoint(DeclineCheckpoint decline) {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph -> stateWithExecutionGraph.declineCheckpoint(decline),
+                "declineCheckpoint");
+    }
+
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(
+            String targetDirectory, boolean advanceToEndOfEventTime) {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                stateWithExecutionGraph.stopWithSavepoint(
+                                        targetDirectory, advanceToEndOfEventTime),
+                        "stopWithSavepoint")
+                .orElse(
+                        FutureUtils.completedExceptionally(
+                                new CheckpointException(
+                                        "The Flink job is currently not executing.",
+                                        CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE)));
+    }
+
+    @Override
+    public void deliverOperatorEventToCoordinator(
+            ExecutionAttemptID taskExecution, OperatorID operator, OperatorEvent evt)
+            throws FlinkException {
+        final StateWithExecutionGraph stateWithExecutionGraph =
+                state.as(StateWithExecutionGraph.class)
+                        .orElseThrow(
+                                () ->
+                                        new TaskNotRunningException(
+                                                "Task is not known or in state running on the JobManager."));
+
+        stateWithExecutionGraph.deliverOperatorEventToCoordinator(taskExecution, operator, evt);
+    }
+
+    @Override
+    public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+            OperatorID operator, CoordinationRequest request) throws FlinkException {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                stateWithExecutionGraph.deliverCoordinationRequestToCoordinator(
+                                        operator, request),
+                        "deliverCoordinationRequestToCoordinator")
+                .orElseGet(
+                        () ->
+                                FutureUtils.completedExceptionally(
+                                        new FlinkException(
+                                                "Coordinator of operator "
+                                                        + operator
+                                                        + " does not exist")));
+    }
+
+    // ----------------------------------------------------------------
+
+    @Override
+    public boolean hasEnoughResources(ResourceCounter desiredResources) {
+        final Collection<? extends SlotInfo> allSlots =
+                declarativeSlotPool.getFreeSlotsInformation();
+        ResourceCounter outstandingResources = desiredResources;
+
+        final Iterator<? extends SlotInfo> slotIterator = allSlots.iterator();
+
+        while (!outstandingResources.isEmpty() && slotIterator.hasNext()) {
+            final SlotInfo slotInfo = slotIterator.next();
+            final ResourceProfile resourceProfile = slotInfo.getResourceProfile();
+
+            if (outstandingResources.containsResource(resourceProfile)) {
+                outstandingResources = outstandingResources.subtract(resourceProfile, 1);
+            } else {
+                outstandingResources = outstandingResources.subtract(ResourceProfile.UNKNOWN, 1);
+            }
+        }
+
+        return outstandingResources.isEmpty();
+    }
+
+    private <T extends VertexParallelism>
+            ParallelismAndResourceAssignments determineParallelismAndAssignResources(
+                    SlotAllocator<T> slotAllocator) throws JobExecutionException {
+
+        final T vertexParallelism =
+                slotAllocator
+                        .determineParallelism(
+                                jobInformation, declarativeSlotPool.getFreeSlotsInformation())
+                        .orElseThrow(
+                                () ->
+                                        new JobExecutionException(
+                                                jobInformation.getJobID(),
+                                                "Not enough resources available for scheduling."));
+
+        final Map<ExecutionVertexID, LogicalSlot> slotAssignments =
+                slotAllocator.reserveResources(vertexParallelism);
+
+        return new ParallelismAndResourceAssignments(
+                slotAssignments, vertexParallelism.getMaxParallelismForVertices());
+    }
+
+    @Override
+    public ExecutionGraph createExecutionGraphWithAvailableResources() throws Exception {
+        final ParallelismAndResourceAssignments parallelismAndResourceAssignments =
+                determineParallelismAndAssignResources(slotAllocator);
+
+        JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
+        for (JobVertex vertex : adjustedJobGraph.getVertices()) {
+            vertex.setParallelism(parallelismAndResourceAssignments.getParallelism(vertex.getID()));
+        }
+
+        final ExecutionGraph executionGraph = createExecutionGraphAndRestoreState(adjustedJobGraph);
+
+        executionGraph.start(componentMainThreadExecutor);
+        executionGraph.transitionToRunning();
+
+        executionGraph.setInternalTaskFailuresListener(
+                new UpdateSchedulerNgOnInternalFailuresListener(this, jobInformation.getJobID()));
+
+        for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
+            final LogicalSlot assignedSlot =
+                    parallelismAndResourceAssignments.getAssignedSlot(executionVertex.getID());
+            executionVertex
+                    .getCurrentExecutionAttempt()
+                    .registerProducedPartitions(assignedSlot.getTaskManagerLocation(), false);
+            executionVertex.tryAssignResource(assignedSlot);
+        }
+        return executionGraph;
+    }
+
+    private ExecutionGraph createExecutionGraphAndRestoreState(JobGraph adjustedJobGraph)
+            throws Exception {
+        ExecutionDeploymentListener executionDeploymentListener =
+                new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
+        ExecutionStateUpdateListener executionStateUpdateListener =
+                (execution, newState) -> {
+                    if (newState.isTerminal()) {
+                        executionDeploymentTracker.stopTrackingDeploymentOf(execution);
+                    }
+                };
+
+        final ExecutionGraph newExecutionGraph =
+                ExecutionGraphBuilder.buildGraph(
+                        adjustedJobGraph,
+                        configuration,
+                        futureExecutor,
+                        ioExecutor,
+                        userCodeClassLoader,
+                        completedCheckpointStore,
+                        checkpointsCleaner,
+                        checkpointIdCounter,
+                        rpcTimeout,
+                        jobManagerJobMetricGroup,
+                        blobWriter,
+                        LOG,
+                        shuffleMaster,
+                        partitionTracker,
+                        executionDeploymentListener,
+                        executionStateUpdateListener,
+                        initializationTimestamp);
+
+        final CheckpointCoordinator checkpointCoordinator =
+                newExecutionGraph.getCheckpointCoordinator();
+
+        if (checkpointCoordinator != null) {
+            // check whether we find a valid checkpoint
+            if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
+                    new HashSet<>(newExecutionGraph.getAllVertices().values()))) {
+
+                // check whether we can restore from a savepoint
+                tryRestoreExecutionGraphFromSavepoint(
+                        newExecutionGraph, adjustedJobGraph.getSavepointRestoreSettings());
+            }
+        }
+
+        return newExecutionGraph;
+    }
+
+    /**
+     * Tries to restore the given {@link ExecutionGraph} from the provided {@link
+     * SavepointRestoreSettings}, iff checkpointing is enabled.
+     *
+     * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored
+     * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about
+     *     the savepoint to restore from
+     * @throws Exception if the {@link ExecutionGraph} could not be restored
+     */
+    private void tryRestoreExecutionGraphFromSavepoint(
+            ExecutionGraph executionGraphToRestore,
+            SavepointRestoreSettings savepointRestoreSettings)
+            throws Exception {
+        if (savepointRestoreSettings.restoreSavepoint()) {
+            final CheckpointCoordinator checkpointCoordinator =
+                    executionGraphToRestore.getCheckpointCoordinator();
+            if (checkpointCoordinator != null) {
+                checkpointCoordinator.restoreSavepoint(
+                        savepointRestoreSettings.getRestorePath(),
+                        savepointRestoreSettings.allowNonRestoredState(),
+                        executionGraphToRestore.getAllVertices(),
+                        userCodeClassLoader);
+            }
+        }
+    }
+
+    @Override
+    public ArchivedExecutionGraph getArchivedExecutionGraph(
+            JobStatus jobStatus, @Nullable Throwable cause) {
+        return ArchivedExecutionGraph.createFromInitializingJob(
+                jobInformation.getJobID(),
+                jobInformation.getName(),
+                jobStatus,
+                cause,
+                initializationTimestamp);
+    }
+
+    @Override
+    public void goToWaitingForResources() {
+        final ResourceCounter desiredResources = calculateDesiredResources();
+        declarativeSlotPool.setResourceRequirements(desiredResources);
+
+        // TODO: add resourceTimeout parameter
+        transitionToState(
+                new WaitingForResources(this, LOG, desiredResources, Duration.ofSeconds(10)));
+    }
+
+    private ResourceCounter calculateDesiredResources() {
+        return slotAllocator.calculateRequiredSlots(jobInformation.getVertices());
+    }
+
+    @Override
+    public void goToExecuting(ExecutionGraph executionGraph) {
+        final ExecutionGraphHandler executionGraphHandler =
+                new ExecutionGraphHandler(
+                        executionGraph, LOG, ioExecutor, componentMainThreadExecutor);
+        final OperatorCoordinatorHandler operatorCoordinatorHandler =
+                new OperatorCoordinatorHandler(executionGraph, this::handleGlobalFailure);
+        operatorCoordinatorHandler.initializeOperatorCoordinators(componentMainThreadExecutor);
+        operatorCoordinatorHandler.startAllOperatorCoordinators();
+
+        transitionToState(
+                new Executing(
+                        executionGraph,
+                        executionGraphHandler,
+                        operatorCoordinatorHandler,
+                        LOG,
+                        this,
+                        userCodeClassLoader));
+    }
+
+    @Override
+    public void goToCanceling(
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler) {
+        transitionToState(
+                new Canceling(
+                        this,
+                        executionGraph,
+                        executionGraphHandler,
+                        operatorCoordinatorHandler,
+                        LOG));
+    }
+
+    @Override
+    public void goToRestarting(
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Duration backoffTime) {
+        transitionToState(
+                new Restarting(
+                        this,
+                        executionGraph,
+                        executionGraphHandler,
+                        operatorCoordinatorHandler,
+                        LOG,
+                        backoffTime));
+    }
+
+    @Override
+    public void goToFailing(
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Throwable failureCause) {
+        transitionToState(
+                new Failing(
+                        this,
+                        executionGraph,
+                        executionGraphHandler,
+                        operatorCoordinatorHandler,
+                        LOG,
+                        failureCause));
+    }
+
+    @Override
+    public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) {
+        transitionToState(new Finished(this, archivedExecutionGraph, LOG));
+    }
+
+    @Override
+    public boolean canScaleUp(ExecutionGraph executionGraph) {
+        int availableSlots = declarativeSlotPool.getFreeSlotsInformation().size();
+
+        if (availableSlots > 0) {
+            final Optional<? extends VertexParallelism> potentialNewParallelism =
+                    slotAllocator.determineParallelism(
+                            jobInformation, declarativeSlotPool.getAllSlotsInformation());
+
+            if (potentialNewParallelism.isPresent()) {
+                int currentCumulativeParallelism = getCurrentCumulativeParallelism(executionGraph);
+                int newCumulativeParallelism =
+                        getCumulativeParallelism(potentialNewParallelism.get());
+                if (newCumulativeParallelism > currentCumulativeParallelism) {
+                    LOG.debug(
+                            "Offering scale up to scale up controller with currentCumulativeParallelism={}, newCumulativeParallelism={}",
+                            currentCumulativeParallelism,
+                            newCumulativeParallelism);
+                    return scaleUpController.canScaleUp(
+                            currentCumulativeParallelism, newCumulativeParallelism);
+                }
+            }
+        }
+        return false;
+    }
+
+    private static int getCurrentCumulativeParallelism(ExecutionGraph executionGraph) {
+        return executionGraph.getAllVertices().values().stream()
+                .map(ExecutionJobVertex::getParallelism)
+                .reduce(0, Integer::sum);
+    }
+
+    private static int getCumulativeParallelism(VertexParallelism potentialNewParallelism) {
+        return potentialNewParallelism.getMaxParallelismForVertices().values().stream()
+                .reduce(0, Integer::sum);
+    }
+
+    @Override
+    public void onFinished(ArchivedExecutionGraph archivedExecutionGraph) {
+        stopCheckpointServicesSafely(archivedExecutionGraph.getState());
+
+        if (jobStatusListener != null) {
+            jobStatusListener.jobStatusChanges(
+                    jobInformation.getJobID(),
+                    archivedExecutionGraph.getState(),
+                    archivedExecutionGraph.getStatusTimestamp(archivedExecutionGraph.getState()),
+                    archivedExecutionGraph.getFailureInfo() != null
+                            ? archivedExecutionGraph.getFailureInfo().getException()
+                            : null);
+        }
+    }
+
+    private void stopCheckpointServicesSafely(JobStatus terminalState) {
+        Exception exception = null;
+
+        try {
+            completedCheckpointStore.shutdown(terminalState, checkpointsCleaner);
+        } catch (Exception e) {
+            exception = e;
+        }
+
+        try {
+            checkpointIdCounter.shutdown(terminalState);
+        } catch (Exception e) {
+            exception = ExceptionUtils.firstOrSuppressed(e, exception);
+        }
+
+        if (exception != null) {
+            LOG.warn("Failed to stop checkpoint services.", exception);
+        }
+    }
+
+    @Override
+    public Executing.FailureResult howToHandleFailure(Throwable failure) {
+        if (ExecutionFailureHandler.isUnrecoverableError(failure)) {
+            return Executing.FailureResult.canNotRestart(
+                    new JobException("The failure is not recoverable", failure));
+        }
+
+        restartBackoffTimeStrategy.notifyFailure(failure);
+        if (restartBackoffTimeStrategy.canRestart()) {
+            return Executing.FailureResult.canRestart(
+                    Duration.ofMillis(restartBackoffTimeStrategy.getBackoffTime()));
+        } else {
+            return Executing.FailureResult.canNotRestart(
+                    new JobException(
+                            "Recovery is suppressed by " + restartBackoffTimeStrategy, failure));
+        }
+    }
+
+    @Override
+    public Executor getMainThreadExecutor() {
+        return componentMainThreadExecutor;
+    }
+
+    @Override
+    public boolean isState(State expectedState) {
+        return expectedState == this.state;
+    }
+
+    @Override
+    public void runIfState(State expectedState, Runnable action) {
+        if (isState(expectedState)) {
+            try {
+                action.run();
+            } catch (Throwable t) {
+                fatalErrorHandler.onFatalError(t);
+            }
+        } else {
+            LOG.debug(
+                    "Ignoring scheduled action because expected state {} is not the actual state {}.",
+                    expectedState,
+                    state);
+        }
+    }
+
+    @Override
+    public void runIfState(State expectedState, Runnable action, Duration delay) {
+        componentMainThreadExecutor.schedule(
+                () -> runIfState(expectedState, action), delay.toMillis(), TimeUnit.MILLISECONDS);
+    }
+
+    // ----------------------------------------------------------------
+
+    @VisibleForTesting
+    void transitionToState(State newState) {
+        if (state != newState) {
+            LOG.debug(
+                    "Transition from state {} to {}.",
+                    state.getClass().getSimpleName(),
+                    newState.getClass().getSimpleName());
+
+            State oldState = state;
+            oldState.onLeave(newState.getClass());
+
+            state = newState;
+            newState.onEnter();
+        }
+    }
+
+    @VisibleForTesting
+    State getState() {
+        return state;
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerFactory.java
new file mode 100644
index 0000000..5e7d630
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+/** Factory for the declarative scheduler. */
+public class DeclarativeSchedulerFactory implements SchedulerNGFactory {
+    @Override
+    public SchedulerNG createInstance(
+            Logger log,
+            JobGraph jobGraph,
+            Executor ioExecutor,
+            Configuration jobMasterConfiguration,
+            SlotPoolService slotPoolService,
+            ScheduledExecutorService futureExecutor,
+            ClassLoader userCodeLoader,
+            CheckpointRecoveryFactory checkpointRecoveryFactory,
+            Time rpcTimeout,
+            BlobWriter blobWriter,
+            JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            Time slotRequestTimeout,
+            ShuffleMaster<?> shuffleMaster,
+            JobMasterPartitionTracker partitionTracker,
+            ExecutionDeploymentTracker executionDeploymentTracker,
+            long initializationTimestamp,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            FatalErrorHandler fatalErrorHandler,
+            JobStatusListener jobStatusListener)
+            throws Exception {
+        final DeclarativeSlotPool declarativeSlotPool =
+                slotPoolService
+                        .castInto(DeclarativeSlotPool.class)
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "The DeclarativeScheduler requires a DeclarativeSlotPool."));
+        final RestartBackoffTimeStrategy restartBackoffTimeStrategy =
+                RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(
+                                jobGraph.getSerializedExecutionConfig()
+                                        .deserializeValue(userCodeLoader)
+                                        .getRestartStrategy(),
+                                jobMasterConfiguration,
+                                jobGraph.isCheckpointingEnabled())
+                        .create();
+        log.info(
+                "Using restart back off time strategy {} for {} ({}).",
+                restartBackoffTimeStrategy,
+                jobGraph.getName(),
+                jobGraph.getJobID());
+
+        return new DeclarativeScheduler(
+                jobGraph,
+                jobMasterConfiguration,
+                declarativeSlotPool,
+                futureExecutor,
+                ioExecutor,
+                userCodeLoader,
+                checkpointRecoveryFactory,
+                rpcTimeout,
+                blobWriter,
+                jobManagerJobMetricGroup,
+                shuffleMaster,
+                partitionTracker,
+                restartBackoffTimeStrategy,
+                executionDeploymentTracker,
+                initializationTimestamp,
+                mainThreadExecutor,
+                fatalErrorHandler,
+                jobStatusListener);
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/JobGraphJobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/JobGraphJobInformation.java
new file mode 100644
index 0000000..8139321
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/JobGraphJobInformation.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.declarative.allocator.JobInformation;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/** {@link JobInformation} created from a {@link JobGraph}. */
+public class JobGraphJobInformation implements JobInformation {
+
+    private final JobGraph jobGraph;
+    private final JobID jobID;
+    private final String name;
+
+    public JobGraphJobInformation(JobGraph jobGraph) {
+        this.jobGraph = jobGraph;
+        this.jobID = jobGraph.getJobID();
+        this.name = jobGraph.getName();
+    }
+
+    @Override
+    public Collection<SlotSharingGroup> getSlotSharingGroups() {
+        return jobGraph.getSlotSharingGroups();
+    }
+
+    @Override
+    public JobInformation.VertexInformation getVertexInformation(JobVertexID jobVertexId) {
+        return new JobVertexInformation(jobGraph.findVertexByID(jobVertexId));
+    }
+
+    public JobID getJobID() {
+        return jobID;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public Iterable<JobInformation.VertexInformation> getVertices() {
+        return jobGraphVerticesToVertexInformation(jobGraph.getVertices());
+    }
+
+    public static Iterable<JobInformation.VertexInformation> jobGraphVerticesToVertexInformation(
+            Iterable<JobVertex> verticesIterable) {
+        return Iterables.transform(verticesIterable, JobVertexInformation::new);
+    }
+
+    /** Returns a copy of a jobGraph that can be mutated. */
+    public JobGraph copyJobGraph() throws IOException, ClassNotFoundException {
+        return InstantiationUtil.clone(jobGraph);
+    }
+
+    private static final class JobVertexInformation implements JobInformation.VertexInformation {
+
+        private final JobVertex jobVertex;
+
+        private JobVertexInformation(JobVertex jobVertex) {
+            this.jobVertex = jobVertex;
+        }
+
+        @Override
+        public JobVertexID getJobVertexID() {
+            return jobVertex.getID();
+        }
+
+        @Override
+        public int getParallelism() {
+            return jobVertex.getParallelism();
+        }
+
+        @Override
+        public SlotSharingGroup getSlotSharingGroup() {
+            return jobVertex.getSlotSharingGroup();
+        }
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/ParallelismAndResourceAssignments.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/ParallelismAndResourceAssignments.java
new file mode 100644
index 0000000..f2527ae
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/ParallelismAndResourceAssignments.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+
+/** Assignment of slots to execution vertices. */
+public final class ParallelismAndResourceAssignments {
+    private final Map<ExecutionVertexID, ? extends LogicalSlot> assignedSlots;
+
+    private final Map<JobVertexID, Integer> parallelismPerJobVertex;
+
+    public ParallelismAndResourceAssignments(
+            Map<ExecutionVertexID, ? extends LogicalSlot> assignedSlots,
+            Map<JobVertexID, Integer> parallelismPerJobVertex) {
+        this.assignedSlots = assignedSlots;
+        this.parallelismPerJobVertex = parallelismPerJobVertex;
+    }
+
+    public int getParallelism(JobVertexID jobVertexId) {
+        Preconditions.checkState(parallelismPerJobVertex.containsKey(jobVertexId));
+        return parallelismPerJobVertex.get(jobVertexId);
+    }
+
+    public LogicalSlot getAssignedSlot(ExecutionVertexID executionVertexId) {
+        Preconditions.checkState(assignedSlots.containsKey(executionVertexId));
+        return assignedSlots.get(executionVertexId);
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java
index f9c9015..a44fe2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java
@@ -24,6 +24,14 @@ import java.util.Collection;
 
 /** Information about the job. */
 public interface JobInformation {
+    /**
+     * Returns all slot-sharing groups of the job.
+     *
+     * <p>Attention: The returned slot sharing groups should never be modified (they are indeed
+     * mutable)!
+     *
+     * @return all slot-sharing groups of the job
+     */
     Collection<SlotSharingGroup> getSlotSharingGroups();
 
     VertexInformation getVertexInformation(JobVertexID jobVertexId);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
index 5b41bad..2dce418 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
@@ -39,9 +39,9 @@ import java.util.Map;
  *
  * <p>The release process of a shared slot follows one of 2 code paths:
  *
- * <p>1)During normal execution all allocated logical slots will be returned, with the last return
+ * <p>1) During normal execution all allocated logical slots will be returned, with the last return
  * triggering the {@code externalReleaseCallback} which must eventually result in a {@link
- * #release(Throwable)} call. 2)
+ * #release(Throwable)} call.
  *
  * <p>2) If the backing physical is lost (e.g., because the providing TaskManager crashed) then
  * {@link #release(Throwable)} is called without all logical slots having been returned. The runtime
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
index 2ef0e47..810666a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
@@ -21,12 +21,15 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
 import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import org.apache.flink.runtime.scheduler.declarative.DeclarativeSchedulerFactory;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -65,7 +68,18 @@ public class SchedulerNGFactoryFactoryTest extends TestLogger {
         }
     }
 
+    @Test
+    public void fallBackToNonDeclarativeSchedulerForBatchJobsIfDeclarativeIsConfigured() {
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Declarative);
+
+        final SchedulerNGFactory schedulerNGFactory =
+                SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration, JobType.BATCH);
+
+        assertThat(schedulerNGFactory, is(not(instanceOf(DeclarativeSchedulerFactory.class))));
+    }
+
     private static SchedulerNGFactory createSchedulerNGFactory(final Configuration configuration) {
-        return SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
+        return SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration, JobType.BATCH);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index d0d25bc..3270119 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -292,7 +292,8 @@ public class JobMasterTest extends TestLogger {
                     JobMasterConfiguration.fromConfiguration(configuration);
 
             final SchedulerNGFactory schedulerNGFactory =
-                    SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
+                    SchedulerNGFactoryFactory.createSchedulerNGFactory(
+                            configuration, jobGraph.getJobType());
 
             final JobMaster jobMaster =
                     new JobMaster(
@@ -302,7 +303,8 @@ public class JobMasterTest extends TestLogger {
                             jmResourceId,
                             jobGraph,
                             haServices,
-                            SlotPoolServiceFactory.fromConfiguration(configuration),
+                            SlotPoolServiceFactory.fromConfiguration(
+                                    configuration, jobGraph.getJobType()),
                             jobManagerSharedServices,
                             heartbeatServices,
                             UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
index d97778e..bb46435 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
@@ -553,7 +553,7 @@ public class DefaultDeclarativeSlotPoolTest extends TestLogger {
     }
 
     @Nonnull
-    private static Collection<SlotOffer> createSlotOffersForResourceRequirements(
+    public static Collection<SlotOffer> createSlotOffersForResourceRequirements(
             ResourceCounter resourceRequirements) {
         Collection<SlotOffer> slotOffers = new ArrayList<>();
         int slotIndex = 0;
@@ -608,7 +608,7 @@ public class DefaultDeclarativeSlotPoolTest extends TestLogger {
     }
 
     @Nonnull
-    static Collection<SlotOffer> offerSlots(
+    public static Collection<SlotOffer> offerSlots(
             DeclarativeSlotPool slotPool, Collection<SlotOffer> slotOffers) {
         return slotPool.offerSlots(
                 slotOffers, new LocalTaskManagerLocation(), createTaskManagerGateway(null), 0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
index 26bbc09..c5c10ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
@@ -195,7 +195,8 @@ public class JobMasterBuilder {
                 highAvailabilityServices,
                 slotPoolFactory != null
                         ? slotPoolFactory
-                        : SlotPoolServiceFactory.fromConfiguration(configuration),
+                        : SlotPoolServiceFactory.fromConfiguration(
+                                configuration, jobGraph.getJobType()),
                 jobManagerSharedServices,
                 heartbeatServices,
                 UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
@@ -204,7 +205,8 @@ public class JobMasterBuilder {
                 JobMasterBuilder.class.getClassLoader(),
                 schedulerFactory != null
                         ? schedulerFactory
-                        : SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration),
+                        : SchedulerNGFactoryFactory.createSchedulerNGFactory(
+                                configuration, jobGraph.getJobType()),
                 shuffleMaster,
                 partitionTrackerFactory,
                 executionDeploymentTracker,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerBuilder.java
new file mode 100644
index 0000000..192db57
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerBuilder.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+/** Builder for {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerBuilder {
+    private static final Time DEFAULT_TIMEOUT = Time.seconds(300);
+
+    private final JobGraph jobGraph;
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    private Executor ioExecutor = TestingUtils.defaultExecutor();
+    private Configuration jobMasterConfiguration = new Configuration();
+    private ScheduledExecutorService futureExecutor = TestingUtils.defaultExecutor();
+    private ClassLoader userCodeLoader = ClassLoader.getSystemClassLoader();
+    private CheckpointRecoveryFactory checkpointRecoveryFactory =
+            new StandaloneCheckpointRecoveryFactory();
+    private DeclarativeSlotPool declarativeSlotPool;
+    private Time rpcTimeout = DEFAULT_TIMEOUT;
+    private BlobWriter blobWriter = VoidBlobWriter.getInstance();
+    private JobManagerJobMetricGroup jobManagerJobMetricGroup =
+            UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
+    private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE;
+    private JobMasterPartitionTracker partitionTracker = NoOpJobMasterPartitionTracker.INSTANCE;
+    private RestartBackoffTimeStrategy restartBackoffTimeStrategy =
+            NoRestartBackoffTimeStrategy.INSTANCE;
+    private FatalErrorHandler fatalErrorHandler =
+            error ->
+                    FatalExitExceptionHandler.INSTANCE.uncaughtException(
+                            Thread.currentThread(), error);
+    private JobStatusListener jobStatusListener = (ignoredA, ignoredB, ignoredC, ignoredD) -> {};
+
+    public DeclarativeSchedulerBuilder(
+            final JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor) {
+        this.jobGraph = jobGraph;
+        this.mainThreadExecutor = mainThreadExecutor;
+
+        this.declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        DEFAULT_TIMEOUT,
+                        rpcTimeout);
+    }
+
+    public DeclarativeSchedulerBuilder setIoExecutor(final Executor ioExecutor) {
+        this.ioExecutor = ioExecutor;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setJobMasterConfiguration(
+            final Configuration jobMasterConfiguration) {
+        this.jobMasterConfiguration = jobMasterConfiguration;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setFutureExecutor(
+            final ScheduledExecutorService futureExecutor) {
+        this.futureExecutor = futureExecutor;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setUserCodeLoader(final ClassLoader userCodeLoader) {
+        this.userCodeLoader = userCodeLoader;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setCheckpointRecoveryFactory(
+            final CheckpointRecoveryFactory checkpointRecoveryFactory) {
+        this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setRpcTimeout(final Time rpcTimeout) {
+        this.rpcTimeout = rpcTimeout;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setBlobWriter(final BlobWriter blobWriter) {
+        this.blobWriter = blobWriter;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setJobManagerJobMetricGroup(
+            final JobManagerJobMetricGroup jobManagerJobMetricGroup) {
+        this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setShuffleMaster(final ShuffleMaster<?> shuffleMaster) {
+        this.shuffleMaster = shuffleMaster;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setPartitionTracker(
+            final JobMasterPartitionTracker partitionTracker) {
+        this.partitionTracker = partitionTracker;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setDeclarativeSlotPool(
+            DeclarativeSlotPool declarativeSlotPool) {
+        this.declarativeSlotPool = declarativeSlotPool;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setRestartBackoffTimeStrategy(
+            final RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+        this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setFatalErrorHandler(FatalErrorHandler fatalErrorHandler) {
+        this.fatalErrorHandler = fatalErrorHandler;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setJobStatusListener(JobStatusListener jobStatusListener) {
+        this.jobStatusListener = jobStatusListener;
+        return this;
+    }
+
+    public DeclarativeScheduler build() throws Exception {
+        return new DeclarativeScheduler(
+                jobGraph,
+                jobMasterConfiguration,
+                declarativeSlotPool,
+                futureExecutor,
+                ioExecutor,
+                userCodeLoader,
+                checkpointRecoveryFactory,
+                rpcTimeout,
+                blobWriter,
+                jobManagerJobMetricGroup,
+                shuffleMaster,
+                partitionTracker,
+                restartBackoffTimeStrategy,
+                new DefaultExecutionDeploymentTracker(),
+                System.currentTimeMillis(),
+                mainThreadExecutor,
+                fatalErrorHandler,
+                jobStatusListener);
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerClusterITCase.java
new file mode 100644
index 0000000..f7e528d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerClusterITCase.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.OnceBlockingNoOpInvokable;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * This class contains integration tests for the declarative scheduler which start a {@link
+ * org.apache.flink.runtime.minicluster.MiniCluster} per test case.
+ */
+public class DeclarativeSchedulerClusterITCase extends TestLogger {
+
+    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+    private static final int NUMBER_TASK_MANAGERS = 2;
+    private static final int PARALLELISM = NUMBER_SLOTS_PER_TASK_MANAGER * NUMBER_TASK_MANAGERS;
+
+    private final Configuration configuration = createConfiguration();
+
+    @Rule
+    public final MiniClusterResource miniClusterResource =
+            new MiniClusterResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(configuration)
+                            .setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+                            .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+                            .build());
+
+    private Configuration createConfiguration() {
+        final Configuration configuration = new Configuration();
+
+        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Declarative);
+        configuration.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+        return configuration;
+    }
+
+    @Test
+    public void testAutomaticScaleDownInCaseOfLostSlots() throws InterruptedException, IOException {
+        assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM);
+
+        miniCluster.submitJob(jobGraph).join();
+        final CompletableFuture<JobResult> resultFuture =
+                miniCluster.requestJobResult(jobGraph.getJobID());
+
+        OnceBlockingNoOpInvokable.waitUntilOpsAreRunning();
+
+        miniCluster.terminateTaskManager(0);
+
+        final JobResult jobResult = resultFuture.join();
+
+        assertTrue(jobResult.isSuccess());
+    }
+
+    @Test
+    public void testAutomaticScaleUp() throws Exception {
+        assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        int targetInstanceCount = NUMBER_SLOTS_PER_TASK_MANAGER * (NUMBER_TASK_MANAGERS + 1);
+        final JobGraph jobGraph = createBlockingJobGraph(targetInstanceCount);
+
+        // initially only expect NUMBER_TASK_MANAGERS
+        OnceBlockingNoOpInvokable.resetFor(NUMBER_SLOTS_PER_TASK_MANAGER * NUMBER_TASK_MANAGERS);
+
+        log.info(
+                "Submitting job with parallelism of "
+                        + targetInstanceCount
+                        + ", to a cluster with only one TM.");
+        miniCluster.submitJob(jobGraph).join();
+        final CompletableFuture<JobResult> jobResultFuture =
+                miniCluster.requestJobResult(jobGraph.getJobID());
+
+        OnceBlockingNoOpInvokable.waitUntilOpsAreRunning();
+
+        log.info("Start additional TaskManager to scale up to the full parallelism.");
+        OnceBlockingNoOpInvokable.resetInstanceCount(); // we expect a restart
+        OnceBlockingNoOpInvokable.resetFor(targetInstanceCount);
+        miniCluster.startTaskManager();
+
+        log.info("Waiting until Invokable is running with higher parallelism");
+        OnceBlockingNoOpInvokable.waitUntilOpsAreRunning();
+
+        assertEquals(targetInstanceCount, OnceBlockingNoOpInvokable.getInstanceCount());
+
+        assertTrue(jobResultFuture.join().isSuccess());
+    }
+
+    private JobGraph createBlockingJobGraph(int parallelism) throws IOException {
+        final JobVertex blockingOperator = new JobVertex("Blocking operator");
+
+        OnceBlockingNoOpInvokable.resetFor(parallelism);
+        blockingOperator.setInvokableClass(OnceBlockingNoOpInvokable.class);
+
+        blockingOperator.setParallelism(parallelism);
+
+        final JobGraph jobGraph = new JobGraph("Blocking job.", blockingOperator);
+        jobGraph.setJobType(JobType.STREAMING);
+
+        ExecutionConfig executionConfig = new ExecutionConfig();
+        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
+        jobGraph.setExecutionConfig(executionConfig);
+
+        return jobGraph;
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSimpleITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSimpleITCase.java
new file mode 100644
index 0000000..290ae9c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSimpleITCase.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/** Integration tests for the declarative scheduler. */
+public class DeclarativeSchedulerSimpleITCase extends TestLogger {
+
+    private static final int NUMBER_TASK_MANAGERS = 2;
+    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+    private static final int PARALLELISM = 10;
+
+    private static final Configuration configuration = getConfiguration();
+
+    private static Configuration getConfiguration() {
+        final Configuration configuration = new Configuration();
+
+        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Declarative);
+        configuration.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+        return configuration;
+    }
+
+    @ClassRule
+    public static final MiniClusterResource MINI_CLUSTER_RESOURCE =
+            new MiniClusterResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(configuration)
+                            .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+                            .setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+                            .build());
+
+    @Test
+    public void testSchedulingOfSimpleJob() throws Exception {
+        assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+        final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
+        final JobGraph jobGraph = createJobGraph();
+
+        miniCluster.submitJob(jobGraph).join();
+
+        final JobResult jobResult = miniCluster.requestJobResult(jobGraph.getJobID()).join();
+
+        final JobExecutionResult jobExecutionResult =
+                jobResult.toJobExecutionResult(getClass().getClassLoader());
+
+        assertTrue(jobResult.isSuccess());
+    }
+
+    private JobGraph createJobGraph() {
+        final JobVertex source = new JobVertex("Source");
+        source.setInvokableClass(NoOpInvokable.class);
+        source.setParallelism(PARALLELISM);
+
+        final JobVertex sink = new JobVertex("sink");
+        sink.setInvokableClass(NoOpInvokable.class);
+        sink.setParallelism(PARALLELISM);
+
+        sink.connectNewDataSetAsInput(
+                source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+        return new JobGraph("Simple job", source, sink);
+    }
+
+    @Test
+    public void testGlobalFailoverIfTaskFails() throws IOException {
+        assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+        final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
+        final JobGraph jobGraph = createOnceFailingJobGraph();
+
+        miniCluster.submitJob(jobGraph).join();
+
+        final JobResult jobResult = miniCluster.requestJobResult(jobGraph.getJobID()).join();
+
+        assertTrue(jobResult.isSuccess());
+    }
+
+    private JobGraph createOnceFailingJobGraph() throws IOException {
+        final JobVertex onceFailingOperator = new JobVertex("Once failing operator");
+
+        OnceFailingInvokable.reset();
+        onceFailingOperator.setInvokableClass(OnceFailingInvokable.class);
+
+        onceFailingOperator.setParallelism(1);
+        final JobGraph jobGraph = new JobGraph("Once failing job", onceFailingOperator);
+        jobGraph.setJobType(JobType.STREAMING);
+        ExecutionConfig executionConfig = new ExecutionConfig();
+        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
+        jobGraph.setExecutionConfig(executionConfig);
+        return jobGraph;
+    }
+
+    /** Once failing {@link AbstractInvokable}. */
+    public static final class OnceFailingInvokable extends AbstractInvokable {
+        private static volatile boolean hasFailed = false;
+
+        /**
+         * Create an Invokable task and set its environment.
+         *
+         * @param environment The environment assigned to this invokable.
+         */
+        public OnceFailingInvokable(Environment environment) {
+            super(environment);
+        }
+
+        @Override
+        public void invoke() throws Exception {
+            if (!hasFailed && getIndexInSubtaskGroup() == 0) {
+                hasFailed = true;
+                throw new FlinkRuntimeException("Test failure.");
+            }
+        }
+
+        private static void reset() {
+            hasFailed = false;
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSlotSharingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSlotSharingITCase.java
new file mode 100644
index 0000000..06b993b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSlotSharingITCase.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/** SlotSharing tests for the declarative scheduler. */
+public class DeclarativeSchedulerSlotSharingITCase extends TestLogger {
+
+    private static final int NUMBER_TASK_MANAGERS = 1;
+    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 1;
+    private static final int PARALLELISM = 10;
+
+    private static Configuration getConfiguration() {
+        final Configuration configuration = new Configuration();
+
+        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Declarative);
+        configuration.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+        return configuration;
+    }
+
+    @ClassRule
+    public static final MiniClusterResource MINI_CLUSTER_RESOURCE =
+            new MiniClusterResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(getConfiguration())
+                            .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+                            .setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+                            .build());
+
+    @Test
+    public void testSchedulingOfJobRequiringSlotSharing() throws Exception {
+        // run job multiple times to ensure slots are cleaned up properly
+        runJob();
+        runJob();
+    }
+
+    private void runJob() throws Exception {
+        final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
+        final JobGraph jobGraph = createJobGraph();
+
+        miniCluster.submitJob(jobGraph).join();
+
+        final JobResult jobResult = miniCluster.requestJobResult(jobGraph.getJobID()).join();
+
+        // this throws an exception if the job failed
+        jobResult.toJobExecutionResult(getClass().getClassLoader());
+
+        assertTrue(jobResult.isSuccess());
+    }
+
+    /**
+     * Returns a JobGraph that requires slot sharing to work in order to be able to run with a
+     * single slot.
+     */
+    private static JobGraph createJobGraph() {
+        final SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
+
+        final JobVertex source = new JobVertex("Source");
+        source.setInvokableClass(NoOpInvokable.class);
+        source.setParallelism(PARALLELISM);
+        source.setSlotSharingGroup(slotSharingGroup);
+
+        final JobVertex sink = new JobVertex("sink");
+        sink.setInvokableClass(NoOpInvokable.class);
+        sink.setParallelism(PARALLELISM);
+        sink.setSlotSharingGroup(slotSharingGroup);
+
+        sink.connectNewDataSetAsInput(
+                source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+        final JobGraph jobGraph = new JobGraph("Simple job", source, sink);
+        jobGraph.setJobType(JobType.STREAMING);
+
+        return jobGraph;
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
new file mode 100644
index 0000000..44dc879
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
+import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.offerSlots;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerTest extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+    private static final JobVertex JOB_VERTEX;
+
+    static {
+        JOB_VERTEX = new JobVertex("v1");
+        JOB_VERTEX.setParallelism(PARALLELISM);
+        JOB_VERTEX.setInvokableClass(AbstractInvokable.class);
+    }
+
+    private final ManuallyTriggeredComponentMainThreadExecutor mainThreadExecutor =
+            new ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
+
+    @Test
+    public void testInitialState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        assertThat(scheduler.getState(), instanceOf(Created.class));
+    }
+
+    @Test
+    public void testIsState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        final State state = scheduler.getState();
+
+        assertThat(scheduler.isState(state), is(true));
+        assertThat(scheduler.isState(new DummyState()), is(false));
+    }
+
+    @Test
+    public void testRunIfState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        AtomicBoolean ran = new AtomicBoolean(false);
+        scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
+        assertThat(ran.get(), is(true));
+    }
+
+    @Test
+    public void testRunIfStateWithStateMismatch() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        AtomicBoolean ran = new AtomicBoolean(false);
+        scheduler.runIfState(new DummyState(), () -> ran.set(true));
+        assertThat(ran.get(), is(false));
+    }
+
+    @Test
+    public void testHasEnoughResourcesReturnsFalseIfUnsatisfied() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        scheduler.startScheduling();
+
+        final ResourceCounter resourceRequirement =
+                ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
+
+        assertThat(scheduler.hasEnoughResources(resourceRequirement), is(false));
+    }
+
+    @Test
+    public void testHasEnoughResourcesReturnsTrueIfSatisfied() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        final ResourceCounter resourceRequirement =
+                ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
+
+        offerSlots(
+                declarativeSlotPool, createSlotOffersForResourceRequirements(resourceRequirement));
+
+        assertThat(scheduler.hasEnoughResources(resourceRequirement), is(true));
+    }
+
+    @Test
+    public void testHasEnoughResourcesUsesUnmatchedSlotsAsUnknown() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        final int numRequiredSlots = 1;
+        final ResourceCounter requiredResources =
+                ResourceCounter.withResource(ResourceProfile.UNKNOWN, numRequiredSlots);
+        final ResourceCounter providedResources =
+                ResourceCounter.withResource(
+                        ResourceProfile.newBuilder().setCpuCores(1).build(), numRequiredSlots);
+
+        offerSlots(declarativeSlotPool, createSlotOffersForResourceRequirements(providedResources));
+
+        assertThat(scheduler.hasEnoughResources(requiredResources), is(true));
+    }
+
+    @Test
+    public void testExecutionGraphGenerationWithAvailableResources() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        final int numAvailableSlots = 1;
+
+        offerSlots(
+                declarativeSlotPool,
+                createSlotOffersForResourceRequirements(
+                        ResourceCounter.withResource(ResourceProfile.UNKNOWN, numAvailableSlots)));
+
+        final ExecutionGraph executionGraph =
+                scheduler.createExecutionGraphWithAvailableResources();
+
+        assertThat(
+                executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism(),
+                is(numAvailableSlots));
+    }
+
+    @Test
+    public void testFatalErrorsForwardedToFatalErrorHandler() throws Exception {
+        final TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor)
+                        .setFatalErrorHandler(fatalErrorHandler)
+                        .build();
+
+        final RuntimeException exception = new RuntimeException();
+
+        scheduler.runIfState(
+                scheduler.getState(),
+                () -> {
+                    throw exception;
+                });
+
+        assertThat(fatalErrorHandler.getException(), is(exception));
+    }
+
+    // ---------------------------------------------------------------------------------------------
+    // State transition tests
+    // ---------------------------------------------------------------------------------------------
+
+    @Test
+    public void testStartSchedulingTransitionsToWaitingForResources() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        scheduler.startScheduling();
+
+        assertThat(scheduler.getState(), instanceOf(WaitingForResources.class));
+    }
+
+    @Test
+    public void testStartSchedulingSetsResourceRequirements() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        assertThat(
+                declarativeSlotPool.getResourceRequirements(),
+                contains(ResourceRequirement.create(ResourceProfile.UNKNOWN, PARALLELISM)));
+    }
+
+    /** Tests that the listener for new slots is properly set up. */
+    @Test
+    public void testResourceAcquisitionTriggersJobExecution() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        offerSlots(
+                declarativeSlotPool,
+                createSlotOffersForResourceRequirements(
+                        ResourceCounter.withResource(ResourceProfile.UNKNOWN, PARALLELISM)));
+
+        assertThat(scheduler.getState(), instanceOf(Executing.class));
+    }
+
+    @Test
+    public void testGoToFinished() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        final ArchivedExecutionGraph archivedExecutionGraph =
+                new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build();
+
+        scheduler.goToFinished(archivedExecutionGraph);
+
+        assertThat(scheduler.getState(), instanceOf(Finished.class));
+    }
+
+    @Test
+    public void testGoToFinishedNotifiesJobListener() throws Exception {
+        final AtomicReference<JobStatus> jobStatusUpdate = new AtomicReference<>();
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor)
+                        .setJobStatusListener(
+                                (jobId, newJobStatus, timestamp, error) ->
+                                        jobStatusUpdate.set(newJobStatus))
+                        .build();
+
+        final ArchivedExecutionGraph archivedExecutionGraph =
+                new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build();
+
+        scheduler.goToFinished(archivedExecutionGraph);
+
+        assertThat(jobStatusUpdate.get(), is(archivedExecutionGraph.getState()));
+    }
+
+    @Test
+    public void testGoToFinishedShutsDownCheckpointingComponents() throws Exception {
+        final CompletableFuture<JobStatus> completedCheckpointStoreShutdownFuture =
+                new CompletableFuture<>();
+        final CompletedCheckpointStore completedCheckpointStore =
+                new TestingCompletedCheckpointStore(completedCheckpointStoreShutdownFuture);
+
+        final CompletableFuture<JobStatus> checkpointIdCounterShutdownFuture =
+                new CompletableFuture<>();
+        final CheckpointIDCounter checkpointIdCounter =
+                new TestingCheckpointIDCounter(checkpointIdCounterShutdownFuture);
+
+        final JobGraph jobGraph = createJobGraph();
+        // checkpointing components are only created if checkpointing is enabled
+        jobGraph.setSnapshotSettings(
+                new JobCheckpointingSettings(
+                        CheckpointCoordinatorConfiguration.builder().build(), null));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setCheckpointRecoveryFactory(
+                                new TestingCheckpointRecoveryFactory(
+                                        completedCheckpointStore, checkpointIdCounter))
+                        .build();
+
+        final ArchivedExecutionGraph archivedExecutionGraph =
+                new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build();
+
+        scheduler.goToFinished(archivedExecutionGraph);
+
+        assertThat(completedCheckpointStoreShutdownFuture.get(), is(JobStatus.FAILED));
+        assertThat(checkpointIdCounterShutdownFuture.get(), is(JobStatus.FAILED));
+    }
+
+    @Test
+    public void testTransitionToStateCallsOnEnter() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        final LifecycleMethodCapturingState firstState = new LifecycleMethodCapturingState();
+
+        scheduler.transitionToState(firstState);
+        assertThat(firstState.onEnterCalled, is(true));
+        assertThat(firstState.onLeaveCalled, is(false));
+        firstState.reset();
+    }
+
+    @Test
+    public void testTransitionToStateCallsOnLeave() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        final LifecycleMethodCapturingState firstState = new LifecycleMethodCapturingState();
+        final DummyState secondState = new DummyState();
+
+        scheduler.transitionToState(firstState);
+        firstState.reset();
+
+        scheduler.transitionToState(secondState);
+        assertThat(firstState.onEnterCalled, is(false));
+        assertThat(firstState.onLeaveCalled, is(true));
+        assertThat(firstState.onLeaveNewStateArgument, sameInstance(secondState.getClass()));
+    }
+
+    @Test
+    public void testTransitionToStateIgnoresDuplicateTransitions() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        final LifecycleMethodCapturingState state = new LifecycleMethodCapturingState();
+        scheduler.transitionToState(state);
+        state.reset();
+
+        // attempt to transition into the state we are already in
+        scheduler.transitionToState(state);
+
+        assertThat(state.onEnterCalled, is(false));
+        assertThat(state.onLeaveCalled, is(false));
+    }
+
+    // ---------------------------------------------------------------------------------------------
+    // Failure handling tests
+    // ---------------------------------------------------------------------------------------------
+
+    @Test
+    public void testHowToHandleFailureRejectedByStrategy() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor)
+                        .setRestartBackoffTimeStrategy(NoRestartBackoffTimeStrategy.INSTANCE)
+                        .build();
+
+        assertThat(scheduler.howToHandleFailure(new Exception("test")).canRestart(), is(false));
+    }
+
+    @Test
+    public void testHowToHandleFailureAllowedByStrategy() throws Exception {
+        final TestRestartBackoffTimeStrategy restartBackoffTimeStrategy =
+                new TestRestartBackoffTimeStrategy(true, 1234);
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor)
+                        .setRestartBackoffTimeStrategy(restartBackoffTimeStrategy)
+                        .build();
+
+        final Executing.FailureResult failureResult =
+                scheduler.howToHandleFailure(new Exception("test"));
+
+        assertThat(failureResult.canRestart(), is(true));
+        assertThat(
+                failureResult.getBackoffTime().toMillis(),
+                is(restartBackoffTimeStrategy.getBackoffTime()));
+    }
+
+    @Test
+    public void testHowToHandleFailureUnrecoverableFailure() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        assertThat(
+                scheduler
+                        .howToHandleFailure(new SuppressRestartsException(new Exception("test")))
+                        .canRestart(),
+                is(false));
+    }
+
+    // ---------------------------------------------------------------------------------------------
+    // Illegal state behavior tests
+    // ---------------------------------------------------------------------------------------------
+
+    @Test
+    public void testTriggerSavepointFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.triggerSavepoint("some directory", false),
+                futureFailedWith(CheckpointException.class));
+    }
+
+    @Test
+    public void testStopWithSavepointFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.stopWithSavepoint("some directory", false),
+                futureFailedWith(CheckpointException.class));
+    }
+
+    @Test(expected = TaskNotRunningException.class)
+    public void testDeliverOperatorEventToCoordinatorFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        scheduler.deliverOperatorEventToCoordinator(
+                new ExecutionAttemptID(), new OperatorID(), new TestOperatorEvent());
+    }
+
+    @Test
+    public void testDeliverCoordinationRequestToCoordinatorFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.deliverCoordinationRequestToCoordinator(
+                        new OperatorID(), new CoordinationRequest() {}),
+                futureFailedWith(FlinkException.class));
+    }
+
+    @Test
+    public void testUpdateTaskExecutionStateReturnsFalseInIllegalState() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.updateTaskExecutionState(
+                        new TaskExecutionStateTransition(
+                                new TaskExecutionState(
+                                        jobGraph.getJobID(),
+                                        new ExecutionAttemptID(),
+                                        ExecutionState.FAILED))),
+                is(false));
+    }
+
+    @Test(expected = IOException.class)
+    public void testRequestNextInputSplitFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        scheduler.requestNextInputSplit(JOB_VERTEX.getID(), new ExecutionAttemptID());
+    }
+
+    @Test(expected = PartitionProducerDisposedException.class)
+    public void testRequestPartitionStateFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        scheduler.requestPartitionState(new IntermediateDataSetID(), new ResultPartitionID());
+    }
+
+    // ---------------------------------------------------------------------------------------------
+    // Utils
+    // ---------------------------------------------------------------------------------------------
+
+    private static JobGraph createJobGraph() {
+        final JobGraph jobGraph = new JobGraph(JOB_VERTEX);
+        jobGraph.setJobType(JobType.STREAMING);
+        return jobGraph;
+    }
+
+    private static class LifecycleMethodCapturingState extends DummyState {
+        boolean onEnterCalled = false;
+        boolean onLeaveCalled = false;
+        @Nullable Class<? extends State> onLeaveNewStateArgument = null;
+
+        void reset() {
+            onEnterCalled = false;
+            onLeaveCalled = false;
+            onLeaveNewStateArgument = null;
+        }
+
+        @Override
+        public void onEnter() {
+            onEnterCalled = true;
+        }
+
+        @Override
+        public void onLeave(Class<? extends State> newState) {
+            onLeaveCalled = true;
+            onLeaveNewStateArgument = newState;
+        }
+    }
+
+    private static class DummyState implements State {
+
+        @Override
+        public void cancel() {}
+
+        @Override
+        public void suspend(Throwable cause) {}
+
+        @Override
+        public JobStatus getJobStatus() {
+            return null;
+        }
+
+        @Override
+        public ArchivedExecutionGraph getJob() {
+            return null;
+        }
+
+        @Override
+        public void handleGlobalFailure(Throwable cause) {}
+
+        @Override
+        public Logger getLogger() {
+            return null;
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
index bf83169..621afa5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
@@ -36,6 +36,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /** Tests for the {@link SharedSlot}. */
 public class SharedSlotTest extends TestLogger {
@@ -194,6 +195,16 @@ public class SharedSlotTest extends TestLogger {
                         }));
 
         sharedSlot.release(new Exception("test"));
+
+        // if all logical slots were released, and the sharedSlot no longer allows the allocation of
+        // logical slots, then the slot release was completed
+        assertThat(logicalSlot1.isAlive(), is(false));
+        assertThat(logicalSlot2.isAlive(), is(false));
+        try {
+            sharedSlot.allocateLogicalSlot();
+            fail("Allocation of logical slot should have failed because the slot was released.");
+        } catch (IllegalStateException expected) {
+        }
     }
 
     @Test(expected = IllegalStateException.class)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java
new file mode 100644
index 0000000..c053104
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Simple {@link AbstractInvokable} which blocks the first time it is run. Moreover, one can wait
+ * until n instances of this invokable are running by calling {@link #waitUntilOpsAreRunning()}.
+ *
+ * <p>Before using this class it is important to call {@link #resetFor}.
+ */
+public class OnceBlockingNoOpInvokable extends AbstractInvokable {
+
+    private static final AtomicInteger instanceCount = new AtomicInteger(0);
+
+    private static volatile CountDownLatch numOpsPending = new CountDownLatch(1);
+
+    private static volatile boolean isBlocking = true;
+
+    private final Object lock = new Object();
+
+    private volatile boolean running = true;
+
+    public OnceBlockingNoOpInvokable(Environment environment) {
+        super(environment);
+    }
+
+    @Override
+    public void invoke() throws Exception {
+
+        instanceCount.incrementAndGet();
+        numOpsPending.countDown();
+
+        synchronized (lock) {
+            while (isBlocking && running) {
+                lock.wait();
+            }
+        }
+
+        isBlocking = false;
+    }
+
+    @Override
+    public void cancel() throws Exception {
+        synchronized (lock) {
+            running = false;
+            lock.notifyAll();
+        }
+    }
+
+    public static void waitUntilOpsAreRunning() throws InterruptedException {
+        numOpsPending.await();
+    }
+
+    public static int getInstanceCount() {
+        return instanceCount.get();
+    }
+
+    public static void resetInstanceCount() {
+        instanceCount.set(0);
+    }
+
+    public static void resetFor(int parallelism) {
+        numOpsPending = new CountDownLatch(parallelism);
+        isBlocking = true;
+    }
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java
new file mode 100644
index 0000000..23dacd4
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import static org.junit.Assume.assumeTrue;
+
+/** Integration tests for the declarative scheduler. */
+public class DeclarativeSchedulerITCase extends TestLogger {
+
+    private static final int NUMBER_TASK_MANAGERS = 2;
+    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+    private static final int PARALLELISM = NUMBER_SLOTS_PER_TASK_MANAGER * NUMBER_TASK_MANAGERS;
+
+    private static final Configuration configuration = getConfiguration();
+
+    private static Configuration getConfiguration() {
+        final Configuration configuration = new Configuration();
+
+        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Declarative);
+        configuration.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+        return configuration;
+    }
+
+    @ClassRule
+    public static final MiniClusterResource MINI_CLUSTER_WITH_CLIENT_RESOURCE =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(getConfiguration())
+                            .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+                            .setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+                            .build());
+
+    /** Tests that the declarative scheduler can recover stateful operators. */
+    @Test
+    public void testGlobalFailoverCanRecoverState() throws Exception {
+        assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        env.enableCheckpointing(20L, CheckpointingMode.EXACTLY_ONCE);
+        final DataStreamSource<Integer> input = env.addSource(new SimpleSource());
+
+        input.addSink(new DiscardingSink<>());
+
+        env.execute();
+    }
+
+    /**
+     * Simple source which fails once after a successful checkpoint has been taken. Upon recovery
+     * the source will immediately terminate.
+     */
+    public static final class SimpleSource extends RichParallelSourceFunction<Integer>
+            implements CheckpointListener, CheckpointedFunction {
+
+        private static final ListStateDescriptor<Boolean> unionStateListDescriptor =
+                new ListStateDescriptor<>("state", Boolean.class);
+
+        private volatile boolean running = true;
+
+        @Nullable private ListState<Boolean> unionListState = null;
+
+        private boolean hasFailedBefore = false;
+
+        private boolean fail = false;
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {
+            while (running && !hasFailedBefore) {
+                synchronized (ctx.getCheckpointLock()) {
+                    ctx.collect(getRuntimeContext().getIndexOfThisSubtask());
+
+                    Thread.sleep(5L);
+                }
+
+                if (fail) {
+                    throw new FlinkException("Test failure.");
+                }
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws Exception {
+            fail = true;
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {}
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) throws Exception {
+            unionListState =
+                    context.getOperatorStateStore().getUnionListState(unionStateListDescriptor);
+
+            for (Boolean previousState : unionListState.get()) {
+                hasFailedBefore |= previousState;
+            }
+
+            unionListState.clear();
+            unionListState.add(true);
+        }
+    }
+}