You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/02/18 09:14:48 UTC

[flink] branch master updated (3bfa1de -> 4f8cc4e)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 3bfa1de  [FLINK-21384][docs] Automatically copy maven dependencies to clipboard on click
     new 5eddf7c  [hotfix][tests] Move test checkpoint component classes to upper level
     new 88414e8  [hotfix][coordination] Remove leftover comment
     new f7dbe99  [hotfix][coordination] Assert that return of logical slot does not fail
     new bdf213a  [hotfix][coordination] Log that EG reached terminal state
     new b6654b1  [hotfix][conf] Honor explicit ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT settings
     new 81787d2  [hotfix][coordination] Clarify visibility
     new 687c8ff  [hotfix][coordination] Add StateWithExecutionGraph#reportCheckpointMetrics
     new a56ca86  [FLINK-21100][coordination] Adjust SharedSlot lifecycle
     new 412c048  [FLINK-21100][coordination] Refactor JMOptions#Scheduler to enum option
     new 3c46341  [FLINK-21100][coordination] Pass FatalErrorHandler to scheduler factory
     new 0efe64c  [FLINK-21100][coordination] Add DeclarativeScheduler
     new 4f8cc4e  [FLINK-21100][coordination] Add system property for enabling declarative scheduler

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/configuration/ClusterOptions.java |  24 +-
 .../flink/configuration/JobManagerOptions.java     |  17 +-
 .../dispatcher/DefaultJobManagerRunnerFactory.java |   5 +-
 .../dispatcher/SchedulerNGFactoryFactory.java      |  31 +-
 .../runtime/executiongraph/ExecutionGraph.java     |   2 +
 .../failover/flip1/ExecutionFailureHandler.java    |   4 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  |   1 +
 .../jobmaster/slotpool/SingleLogicalSlot.java      |  28 +-
 .../jobmaster/slotpool/SlotPoolServiceFactory.java |   8 +-
 .../runtime/scheduler/DefaultSchedulerFactory.java |   2 +
 .../runtime/scheduler/SchedulerNGFactory.java      |   2 +
 ...pdateSchedulerNgOnInternalFailuresListener.java |   2 +-
 .../declarative/DeclarativeScheduler.java          | 930 +++++++++++++++++++++
 .../DeclarativeSchedulerFactory.java}              |  93 +--
 .../declarative/JobGraphJobInformation.java        | 101 +++
 .../ParallelismAndResourceAssignments.java         |  50 ++
 .../declarative/StateWithExecutionGraph.java       |  14 +-
 .../declarative/allocator/JobInformation.java      |   8 +
 .../declarative/allocator/SharedSlot.java          |  39 +-
 .../ExecutionGraphCheckpointCoordinatorTest.java   | 108 ---
 .../checkpoint/TestingCheckpointIDCounter.java     |  55 ++
 .../TestingCheckpointRecoveryFactory.java          |  44 +
 .../TestingCompletedCheckpointStore.java           |  75 ++
 .../dispatcher/SchedulerNGFactoryFactoryTest.java  |  24 +-
 .../runtime/jobmaster/JobMasterSchedulerTest.java  |   2 +
 .../flink/runtime/jobmaster/JobMasterTest.java     |   6 +-
 .../slotpool/DefaultDeclarativeSlotPoolTest.java   |   4 +-
 .../runtime/jobmaster/utils/JobMasterBuilder.java  |   6 +-
 .../scheduler/TestingSchedulerNGFactory.java       |   2 +
 .../declarative/DeclarativeSchedulerBuilder.java   | 189 +++++
 .../DeclarativeSchedulerClusterITCase.java         | 147 ++++
 .../DeclarativeSchedulerSimpleITCase.java          | 163 ++++
 .../DeclarativeSchedulerSlotSharingITCase.java     | 113 +++
 .../declarative/DeclarativeSchedulerTest.java      | 617 ++++++++++++++
 .../declarative/allocator/SharedSlotTest.java      |  82 +-
 .../testtasks/OnceBlockingNoOpInvokable.java       |  88 ++
 .../scheduling/DeclarativeSchedulerITCase.java     | 150 ++++
 37 files changed, 3014 insertions(+), 222 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java
 copy flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/{DefaultSchedulerFactory.java => declarative/DeclarativeSchedulerFactory.java} (52%)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/JobGraphJobInformation.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/ParallelismAndResourceAssignments.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerBuilder.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerClusterITCase.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSimpleITCase.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSlotSharingITCase.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java
 create mode 100644 flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java


[flink] 05/12: [hotfix][conf] Honor explicit ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT settings

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b6654b1c3bea93b3d3453a5a089ec3cb43e7625d
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jan 7 22:19:02 2021 +0100

    [hotfix][conf] Honor explicit ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT settings
---
 .../main/java/org/apache/flink/configuration/ClusterOptions.java   | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
index c494312..c75428f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
@@ -132,8 +132,11 @@ public class ClusterOptions {
                             "Defines whether the cluster uses fine-grained resource management.");
 
     public static boolean isDeclarativeResourceManagementEnabled(Configuration configuration) {
-        return configuration.get(ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT)
-                && !System.getProperties().containsKey("flink.tests.disable-declarative");
+        if (configuration.contains(ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT)) {
+            return configuration.get(ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT);
+        } else {
+            return !System.getProperties().containsKey("flink.tests.disable-declarative");
+        }
     }
 
     public static boolean isFineGrainedResourceManagementEnabled(Configuration configuration) {


[flink] 03/12: [hotfix][coordination] Assert that return of logical slot does not fail

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f7dbe99955d4bcb0c6f88ef2918d01e308eafe7c
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Feb 8 20:23:51 2021 +0100

    [hotfix][coordination] Assert that return of logical slot does not fail
---
 .../jobmaster/slotpool/SingleLogicalSlot.java      | 26 ++++++++++------------
 1 file changed, 12 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
index 798b50d..21e14f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
@@ -195,20 +196,17 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
     }
 
     private void returnSlotToOwner(CompletableFuture<?> terminalStateFuture) {
-        terminalStateFuture.whenComplete(
-                (Object ignored, Throwable throwable) -> {
-                    if (state == State.RELEASING) {
-                        slotOwner.returnLogicalSlot(this);
-                    }
-
-                    markReleased();
-
-                    if (throwable != null) {
-                        releaseFuture.completeExceptionally(throwable);
-                    } else {
-                        releaseFuture.complete(null);
-                    }
-                });
+        FutureUtils.assertNoException(
+                terminalStateFuture.thenRun(
+                        () -> {
+                            if (state == State.RELEASING) {
+                                slotOwner.returnLogicalSlot(this);
+                            }
+
+                            markReleased();
+
+                            releaseFuture.complete(null);
+                        }));
     }
 
     private void markReleased() {


[flink] 12/12: [FLINK-21100][coordination] Add system property for enabling declarative scheduler

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4f8cc4ee921c5b2a73e100b00cd21f4f1e763bc8
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Feb 8 20:41:33 2021 +0100

    [FLINK-21100][coordination] Add system property for enabling declarative scheduler
---
 .../org/apache/flink/configuration/ClusterOptions.java  | 17 +++++++++++++++++
 .../runtime/dispatcher/SchedulerNGFactoryFactory.java   |  3 ++-
 .../jobmaster/slotpool/SlotPoolServiceFactory.java      |  3 +--
 3 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
index c75428f..bc69c10 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
@@ -139,6 +139,23 @@ public class ClusterOptions {
         }
     }
 
+    public static JobManagerOptions.SchedulerType getSchedulerType(Configuration configuration) {
+        if (isDeclarativeSchedulerEnabled(configuration)) {
+            return JobManagerOptions.SchedulerType.Declarative;
+        } else {
+            return configuration.get(JobManagerOptions.SCHEDULER);
+        }
+    }
+
+    public static boolean isDeclarativeSchedulerEnabled(Configuration configuration) {
+        if (configuration.contains(JobManagerOptions.SCHEDULER)) {
+            return configuration.get(JobManagerOptions.SCHEDULER)
+                    == JobManagerOptions.SchedulerType.Declarative;
+        } else {
+            return System.getProperties().containsKey("flink.tests.enable-declarative-scheduler");
+        }
+    }
+
     public static boolean isFineGrainedResourceManagementEnabled(Configuration configuration) {
         // TODO We need to bind fine-grained with declarative because in the first step we implement
         // the feature base on the declarative protocol. We would be able to support both protocols
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
index a7e5b02..e9b130d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.runtime.dispatcher;
 
+import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobType;
@@ -39,7 +40,7 @@ public final class SchedulerNGFactoryFactory {
     public static SchedulerNGFactory createSchedulerNGFactory(
             final Configuration configuration, JobType jobType) {
         JobManagerOptions.SchedulerType schedulerType =
-                configuration.get(JobManagerOptions.SCHEDULER);
+                ClusterOptions.getSchedulerType(configuration);
 
         if (schedulerType == JobManagerOptions.SchedulerType.Declarative
                 && jobType == JobType.BATCH) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
index b486747..ea28178 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
@@ -43,8 +43,7 @@ public interface SlotPoolServiceFactory {
                 Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
 
         if (ClusterOptions.isDeclarativeResourceManagementEnabled(configuration)) {
-            if (configuration.get(JobManagerOptions.SCHEDULER)
-                            == JobManagerOptions.SchedulerType.Declarative
+            if (ClusterOptions.isDeclarativeSchedulerEnabled(configuration)
                     && jobType == JobType.STREAMING) {
                 return new DeclarativeSlotPoolServiceFactory(
                         SystemClock.getInstance(), slotIdleTimeout, rpcTimeout);


[flink] 01/12: [hotfix][tests] Move test checkpoint component classes to upper level

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5eddf7ce3970f83dd9dad9bd6d037bfa81f53de6
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Feb 17 10:46:35 2021 +0100

    [hotfix][tests] Move test checkpoint component classes to upper level
---
 .../ExecutionGraphCheckpointCoordinatorTest.java   | 108 ---------------------
 .../checkpoint/TestingCheckpointIDCounter.java     |  55 +++++++++++
 .../TestingCheckpointRecoveryFactory.java          |  44 +++++++++
 .../TestingCompletedCheckpointStore.java           |  75 ++++++++++++++
 4 files changed, 174 insertions(+), 108 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index d458113..e10c9e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
@@ -39,8 +38,6 @@ import org.apache.flink.util.TestLogger;
 import org.hamcrest.Matchers;
 import org.junit.Test;
 
-import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.Matchers.is;
@@ -165,109 +162,4 @@ public class ExecutionGraphCheckpointCoordinatorTest extends TestLogger {
 
         return scheduler;
     }
-
-    private static class TestingCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
-
-        private final CompletedCheckpointStore store;
-        private final CheckpointIDCounter counter;
-
-        private TestingCheckpointRecoveryFactory(
-                CompletedCheckpointStore store, CheckpointIDCounter counter) {
-            this.store = store;
-            this.counter = counter;
-        }
-
-        @Override
-        public CompletedCheckpointStore createCheckpointStore(
-                JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) {
-            return store;
-        }
-
-        @Override
-        public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) {
-            return counter;
-        }
-    }
-
-    private static final class TestingCheckpointIDCounter implements CheckpointIDCounter {
-
-        private final CompletableFuture<JobStatus> shutdownStatus;
-
-        private TestingCheckpointIDCounter(CompletableFuture<JobStatus> shutdownStatus) {
-            this.shutdownStatus = shutdownStatus;
-        }
-
-        @Override
-        public void start() {}
-
-        @Override
-        public void shutdown(JobStatus jobStatus) {
-            shutdownStatus.complete(jobStatus);
-        }
-
-        @Override
-        public long getAndIncrement() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public long get() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public void setCount(long newId) {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-    }
-
-    private static final class TestingCompletedCheckpointStore implements CompletedCheckpointStore {
-
-        private final CompletableFuture<JobStatus> shutdownStatus;
-
-        private TestingCompletedCheckpointStore(CompletableFuture<JobStatus> shutdownStatus) {
-            this.shutdownStatus = shutdownStatus;
-        }
-
-        @Override
-        public void recover() {}
-
-        @Override
-        public void addCheckpoint(
-                CompletedCheckpoint checkpoint,
-                CheckpointsCleaner checkpointsCleaner,
-                Runnable postCleanup) {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) {
-            return null;
-        }
-
-        @Override
-        public void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner) {
-            shutdownStatus.complete(jobStatus);
-        }
-
-        @Override
-        public List<CompletedCheckpoint> getAllCheckpoints() {
-            return Collections.emptyList();
-        }
-
-        @Override
-        public int getNumberOfRetainedCheckpoints() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public int getMaxNumberOfRetainedCheckpoints() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public boolean requiresExternalizedCheckpoints() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java
new file mode 100644
index 0000000..79df955
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobStatus;
+
+import java.util.concurrent.CompletableFuture;
+
+/** Test {@link CheckpointIDCounter} implementation for testing the shutdown behavior. */
+public final class TestingCheckpointIDCounter implements CheckpointIDCounter {
+
+    private final CompletableFuture<JobStatus> shutdownStatus;
+
+    public TestingCheckpointIDCounter(CompletableFuture<JobStatus> shutdownStatus) {
+        this.shutdownStatus = shutdownStatus;
+    }
+
+    @Override
+    public void start() {}
+
+    @Override
+    public void shutdown(JobStatus jobStatus) {
+        shutdownStatus.complete(jobStatus);
+    }
+
+    @Override
+    public long getAndIncrement() {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+
+    @Override
+    public long get() {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+
+    @Override
+    public void setCount(long newId) {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
new file mode 100644
index 0000000..cda32d6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+
+/** A {@link CheckpointRecoveryFactory} that pre-defined checkpointing components. */
+public class TestingCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
+
+    private final CompletedCheckpointStore store;
+    private final CheckpointIDCounter counter;
+
+    public TestingCheckpointRecoveryFactory(
+            CompletedCheckpointStore store, CheckpointIDCounter counter) {
+        this.store = store;
+        this.counter = counter;
+    }
+
+    @Override
+    public CompletedCheckpointStore createCheckpointStore(
+            JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) {
+        return store;
+    }
+
+    @Override
+    public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) {
+        return counter;
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java
new file mode 100644
index 0000000..9e95141
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobStatus;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/** Test {@link CompletedCheckpointStore} implementation for testing the shutdown behavior. */
+public final class TestingCompletedCheckpointStore implements CompletedCheckpointStore {
+
+    private final CompletableFuture<JobStatus> shutdownStatus;
+
+    public TestingCompletedCheckpointStore(CompletableFuture<JobStatus> shutdownStatus) {
+        this.shutdownStatus = shutdownStatus;
+    }
+
+    @Override
+    public void recover() {}
+
+    @Override
+    public void addCheckpoint(
+            CompletedCheckpoint checkpoint,
+            CheckpointsCleaner checkpointsCleaner,
+            Runnable postCleanup) {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+
+    @Override
+    public CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) {
+        return null;
+    }
+
+    @Override
+    public void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner) {
+        shutdownStatus.complete(jobStatus);
+    }
+
+    @Override
+    public List<CompletedCheckpoint> getAllCheckpoints() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public int getNumberOfRetainedCheckpoints() {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+
+    @Override
+    public int getMaxNumberOfRetainedCheckpoints() {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+
+    @Override
+    public boolean requiresExternalizedCheckpoints() {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+}


[flink] 09/12: [FLINK-21100][coordination] Refactor JMOptions#Scheduler to enum option

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 412c04875dd51a3e16c1aeb3dd4df5751001c1fa
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Feb 8 18:22:32 2021 +0100

    [FLINK-21100][coordination] Refactor JMOptions#Scheduler to enum option
---
 .../org/apache/flink/configuration/JobManagerOptions.java   | 13 +++++++++----
 .../flink/runtime/dispatcher/SchedulerNGFactoryFactory.java | 11 +++++------
 .../runtime/dispatcher/SchedulerNGFactoryFactoryTest.java   |  8 +++++---
 3 files changed, 19 insertions(+), 13 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 772b1bb..06da369 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -348,17 +348,22 @@ public class JobManagerOptions {
 
     /** Config parameter determining the scheduler implementation. */
     @Documentation.ExcludeFromDocumentation("SchedulerNG is still in development.")
-    public static final ConfigOption<String> SCHEDULER =
+    public static final ConfigOption<SchedulerType> SCHEDULER =
             key("jobmanager.scheduler")
-                    .stringType()
-                    .defaultValue("ng")
+                    .enumType(SchedulerType.class)
+                    .defaultValue(SchedulerType.Ng)
                     .withDescription(
                             Description.builder()
                                     .text(
                                             "Determines which scheduler implementation is used to schedule tasks. Accepted values are:")
-                                    .list(text("'ng': new generation scheduler"))
+                                    .list(text("'Ng': new generation scheduler"))
                                     .build());
 
+    /** Type of scheduler implementation. */
+    public enum SchedulerType {
+        Ng
+    }
+
     @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
     public static final ConfigOption<SchedulerExecutionMode> SCHEDULER_MODE =
             key("scheduler-mode")
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
index 8a83a8c..34eae71 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
@@ -27,21 +27,20 @@ import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
 /** Factory for {@link SchedulerNGFactory}. */
 public final class SchedulerNGFactoryFactory {
 
-    public static final String SCHEDULER_TYPE_NG = "ng";
-
     private SchedulerNGFactoryFactory() {}
 
     public static SchedulerNGFactory createSchedulerNGFactory(final Configuration configuration) {
-        final String schedulerName = configuration.getString(JobManagerOptions.SCHEDULER);
-        switch (schedulerName) {
-            case SCHEDULER_TYPE_NG:
+        final JobManagerOptions.SchedulerType schedulerType =
+                configuration.get(JobManagerOptions.SCHEDULER);
+        switch (schedulerType) {
+            case Ng:
                 return new DefaultSchedulerFactory();
 
             default:
                 throw new IllegalArgumentException(
                         String.format(
                                 "Illegal value [%s] for config option [%s]",
-                                schedulerName, JobManagerOptions.SCHEDULER.key()));
+                                schedulerType, JobManagerOptions.SCHEDULER.key()));
         }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
index e171d23..2ef0e47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
@@ -44,7 +44,7 @@ public class SchedulerNGFactoryFactoryTest extends TestLogger {
     @Test
     public void createSchedulerNGFactoryIfConfigured() {
         final Configuration configuration = new Configuration();
-        configuration.setString(JobManagerOptions.SCHEDULER, "ng");
+        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Ng);
 
         final SchedulerNGFactory schedulerNGFactory = createSchedulerNGFactory(configuration);
 
@@ -54,12 +54,14 @@ public class SchedulerNGFactoryFactoryTest extends TestLogger {
     @Test
     public void throwsExceptionIfSchedulerNameIsInvalid() {
         final Configuration configuration = new Configuration();
-        configuration.setString(JobManagerOptions.SCHEDULER, "invalid-scheduler-name");
+        configuration.setString(JobManagerOptions.SCHEDULER.key(), "invalid-scheduler-name");
 
         try {
             createSchedulerNGFactory(configuration);
         } catch (IllegalArgumentException e) {
-            assertThat(e.getMessage(), containsString("Illegal value [invalid-scheduler-name]"));
+            assertThat(
+                    e.getMessage(),
+                    containsString("Could not parse value 'invalid-scheduler-name'"));
         }
     }
 


[flink] 07/12: [hotfix][coordination] Add StateWithExecutionGraph#reportCheckpointMetrics

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 687c8ffca4eaab7d3fde48b5873338da7363f030
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Feb 10 16:25:03 2021 +0100

    [hotfix][coordination] Add StateWithExecutionGraph#reportCheckpointMetrics
---
 .../runtime/scheduler/declarative/StateWithExecutionGraph.java    | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StateWithExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StateWithExecutionGraph.java
index f34752e..ce29462 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StateWithExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StateWithExecutionGraph.java
@@ -180,6 +180,14 @@ abstract class StateWithExecutionGraph implements State {
         executionGraphHandler.declineCheckpoint(decline);
     }
 
+    void reportCheckpointMetrics(
+            ExecutionAttemptID executionAttemptID,
+            long checkpointId,
+            CheckpointMetrics checkpointMetrics) {
+        executionGraphHandler.reportCheckpointMetrics(
+                executionAttemptID, checkpointId, checkpointMetrics);
+    }
+
     void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
         executionGraph.updateAccumulators(accumulatorSnapshot);
     }


[flink] 11/12: [FLINK-21100][coordination] Add DeclarativeScheduler

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0efe64c4297d0f8b46ec22b4be88064d530bf488
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Feb 8 20:38:40 2021 +0100

    [FLINK-21100][coordination] Add DeclarativeScheduler
---
 .../flink/configuration/JobManagerOptions.java     |   8 +-
 .../dispatcher/DefaultJobManagerRunnerFactory.java |   5 +-
 .../dispatcher/SchedulerNGFactoryFactory.java      |  23 +-
 .../failover/flip1/ExecutionFailureHandler.java    |   4 +-
 .../jobmaster/slotpool/SlotPoolServiceFactory.java |   9 +-
 ...pdateSchedulerNgOnInternalFailuresListener.java |   2 +-
 .../declarative/DeclarativeScheduler.java          | 930 +++++++++++++++++++++
 .../declarative/DeclarativeSchedulerFactory.java   | 110 +++
 .../declarative/JobGraphJobInformation.java        | 101 +++
 .../ParallelismAndResourceAssignments.java         |  50 ++
 .../declarative/allocator/JobInformation.java      |   8 +
 .../declarative/allocator/SharedSlot.java          |   4 +-
 .../dispatcher/SchedulerNGFactoryFactoryTest.java  |  16 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |   6 +-
 .../slotpool/DefaultDeclarativeSlotPoolTest.java   |   4 +-
 .../runtime/jobmaster/utils/JobMasterBuilder.java  |   6 +-
 .../declarative/DeclarativeSchedulerBuilder.java   | 189 +++++
 .../DeclarativeSchedulerClusterITCase.java         | 147 ++++
 .../DeclarativeSchedulerSimpleITCase.java          | 163 ++++
 .../DeclarativeSchedulerSlotSharingITCase.java     | 113 +++
 .../declarative/DeclarativeSchedulerTest.java      | 617 ++++++++++++++
 .../declarative/allocator/SharedSlotTest.java      |  11 +
 .../testtasks/OnceBlockingNoOpInvokable.java       |  88 ++
 .../scheduling/DeclarativeSchedulerITCase.java     | 150 ++++
 24 files changed, 2744 insertions(+), 20 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 06da369..3e86c69 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -356,12 +356,16 @@ public class JobManagerOptions {
                             Description.builder()
                                     .text(
                                             "Determines which scheduler implementation is used to schedule tasks. Accepted values are:")
-                                    .list(text("'Ng': new generation scheduler"))
+                                    .list(
+                                            text("'Ng': new generation scheduler"),
+                                            text(
+                                                    "'Declarative': declarative scheduler; supports reactive mode"))
                                     .build());
 
     /** Type of scheduler implementation. */
     public enum SchedulerType {
-        Ng
+        Ng,
+        Declarative
     }
 
     @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
index b063a0f..839faeb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
@@ -57,9 +57,10 @@ public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
                 JobMasterConfiguration.fromConfiguration(configuration);
 
         final SlotPoolServiceFactory slotPoolFactory =
-                SlotPoolServiceFactory.fromConfiguration(configuration);
+                SlotPoolServiceFactory.fromConfiguration(configuration, jobGraph.getJobType());
         final SchedulerNGFactory schedulerNGFactory =
-                SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
+                SchedulerNGFactoryFactory.createSchedulerNGFactory(
+                        configuration, jobGraph.getJobType());
         final ShuffleMaster<?> shuffleMaster =
                 ShuffleServiceLoader.loadShuffleServiceFactory(configuration)
                         .createShuffleMaster(configuration);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
index 34eae71..a7e5b02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
@@ -21,20 +21,39 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
 import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import org.apache.flink.runtime.scheduler.declarative.DeclarativeSchedulerFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Factory for {@link SchedulerNGFactory}. */
 public final class SchedulerNGFactoryFactory {
 
+    private static final Logger LOG = LoggerFactory.getLogger(SchedulerNGFactoryFactory.class);
+
     private SchedulerNGFactoryFactory() {}
 
-    public static SchedulerNGFactory createSchedulerNGFactory(final Configuration configuration) {
-        final JobManagerOptions.SchedulerType schedulerType =
+    public static SchedulerNGFactory createSchedulerNGFactory(
+            final Configuration configuration, JobType jobType) {
+        JobManagerOptions.SchedulerType schedulerType =
                 configuration.get(JobManagerOptions.SCHEDULER);
+
+        if (schedulerType == JobManagerOptions.SchedulerType.Declarative
+                && jobType == JobType.BATCH) {
+            LOG.info(
+                    "Declarative Scheduler configured, but Batch job detected. Changing scheduler type to NG / DefaultScheduler.");
+            // overwrite
+            schedulerType = JobManagerOptions.SchedulerType.Ng;
+        }
+
         switch (schedulerType) {
             case Ng:
                 return new DefaultSchedulerFactory();
+            case Declarative:
+                return new DeclarativeSchedulerFactory();
 
             default:
                 throw new IllegalArgumentException(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
index 1bfe42c..6961b45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.runtime.executiongraph.failover.flip1;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
@@ -122,8 +121,7 @@ public class ExecutionFailureHandler {
         }
     }
 
-    @VisibleForTesting
-    static boolean isUnrecoverableError(Throwable cause) {
+    public static boolean isUnrecoverableError(Throwable cause) {
         Optional<Throwable> unrecoverableError =
                 ThrowableClassifier.findThrowableOfThrowableType(
                         cause, ThrowableType.NonRecoverableError);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
index 8a2f078..b486747 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.util.clock.SystemClock;
 
 import javax.annotation.Nonnull;
@@ -34,7 +35,7 @@ public interface SlotPoolServiceFactory {
     @Nonnull
     SlotPoolService createSlotPoolService(@Nonnull JobID jobId);
 
-    static SlotPoolServiceFactory fromConfiguration(Configuration configuration) {
+    static SlotPoolServiceFactory fromConfiguration(Configuration configuration, JobType jobType) {
         final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
         final Time slotIdleTimeout =
                 Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));
@@ -42,6 +43,12 @@ public interface SlotPoolServiceFactory {
                 Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
 
         if (ClusterOptions.isDeclarativeResourceManagementEnabled(configuration)) {
+            if (configuration.get(JobManagerOptions.SCHEDULER)
+                            == JobManagerOptions.SchedulerType.Declarative
+                    && jobType == JobType.STREAMING) {
+                return new DeclarativeSlotPoolServiceFactory(
+                        SystemClock.getInstance(), slotIdleTimeout, rpcTimeout);
+            }
 
             return new DeclarativeSlotPoolBridgeServiceFactory(
                     SystemClock.getInstance(), rpcTimeout, slotIdleTimeout, batchSlotTimeout);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalFailuresListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalFailuresListener.java
index aa3763a..b97620e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalFailuresListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalFailuresListener.java
@@ -31,7 +31,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Calls {@link SchedulerNG#updateTaskExecutionState(TaskExecutionStateTransition)} on task failure.
  * Calls {@link SchedulerNG#handleGlobalFailure(Throwable)} on global failures.
  */
-class UpdateSchedulerNgOnInternalFailuresListener implements InternalFailuresListener {
+public class UpdateSchedulerNgOnInternalFailuresListener implements InternalFailuresListener {
 
     private final SchedulerNG schedulerNg;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java
new file mode 100644
index 0000000..ac89434
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java
@@ -0,0 +1,930 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
+import org.apache.flink.runtime.scheduler.declarative.allocator.SlotAllocator;
+import org.apache.flink.runtime.scheduler.declarative.allocator.SlotSharingSlotAllocator;
+import org.apache.flink.runtime.scheduler.declarative.allocator.VertexParallelism;
+import org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ReactiveScaleUpController;
+import org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ScaleUpController;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link SchedulerNG} implementation that uses the declarative resource management and
+ * automatically adapts the parallelism in case not enough resource could be acquired to run at the
+ * configured parallelism, as described in FLIP-160.
+ *
+ * <p>This scheduler only supports jobs with streaming semantics, i.e., all vertices are connected
+ * via pipelined data-exchanges.
+ *
+ * <p>The implementation is spread over multiple {@link State} classes that control which RPCs are
+ * allowed in a given state and what state transitions are possible (see the FLIP for an overview).
+ * This class can thus be roughly split into 2 parts:
+ *
+ * <p>1) RPCs, which must forward the call to the state via {@link State#tryRun(Class,
+ * ThrowingConsumer, String)} or {@link State#tryCall(Class, FunctionWithException, String)}.
+ *
+ * <p>2) Context methods, which are called by states, to either transition into another state or
+ * access functionality of some component in the scheduler.
+ */
+public class DeclarativeScheduler
+        implements SchedulerNG,
+                Created.Context,
+                WaitingForResources.Context,
+                Executing.Context,
+                Restarting.Context,
+                Failing.Context,
+                Finished.Context {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DeclarativeScheduler.class);
+
+    private final JobGraphJobInformation jobInformation;
+
+    private final DeclarativeSlotPool declarativeSlotPool;
+
+    private final long initializationTimestamp;
+
+    private final Configuration configuration;
+    private final ScheduledExecutorService futureExecutor;
+    private final Executor ioExecutor;
+    private final ClassLoader userCodeClassLoader;
+    private final Time rpcTimeout;
+    private final BlobWriter blobWriter;
+    private final ShuffleMaster<?> shuffleMaster;
+    private final JobMasterPartitionTracker partitionTracker;
+    private final ExecutionDeploymentTracker executionDeploymentTracker;
+    private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
+
+    private final CompletedCheckpointStore completedCheckpointStore;
+    private final CheckpointIDCounter checkpointIdCounter;
+    private final CheckpointsCleaner checkpointsCleaner;
+
+    private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+    private final RestartBackoffTimeStrategy restartBackoffTimeStrategy;
+
+    private final ComponentMainThreadExecutor componentMainThreadExecutor;
+    private final FatalErrorHandler fatalErrorHandler;
+
+    private final JobStatusListener jobStatusListener;
+
+    private final SlotAllocator<?> slotAllocator;
+
+    private final ScaleUpController scaleUpController;
+
+    private State state = new Created(this, LOG);
+
+    public DeclarativeScheduler(
+            JobGraph jobGraph,
+            Configuration configuration,
+            DeclarativeSlotPool declarativeSlotPool,
+            ScheduledExecutorService futureExecutor,
+            Executor ioExecutor,
+            ClassLoader userCodeClassLoader,
+            CheckpointRecoveryFactory checkpointRecoveryFactory,
+            Time rpcTimeout,
+            BlobWriter blobWriter,
+            JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            ShuffleMaster<?> shuffleMaster,
+            JobMasterPartitionTracker partitionTracker,
+            RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+            ExecutionDeploymentTracker executionDeploymentTracker,
+            long initializationTimestamp,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            FatalErrorHandler fatalErrorHandler,
+            JobStatusListener jobStatusListener)
+            throws JobExecutionException {
+
+        ensureFullyPipelinedStreamingJob(jobGraph);
+
+        this.jobInformation = new JobGraphJobInformation(jobGraph);
+        this.declarativeSlotPool = declarativeSlotPool;
+        this.initializationTimestamp = initializationTimestamp;
+        this.configuration = configuration;
+        this.futureExecutor = futureExecutor;
+        this.ioExecutor = ioExecutor;
+        this.userCodeClassLoader = userCodeClassLoader;
+        this.rpcTimeout = rpcTimeout;
+        this.blobWriter = blobWriter;
+        this.shuffleMaster = shuffleMaster;
+        this.partitionTracker = partitionTracker;
+        this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+        this.executionDeploymentTracker = executionDeploymentTracker;
+        this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+        this.fatalErrorHandler = fatalErrorHandler;
+        this.completedCheckpointStore =
+                SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(
+                        jobGraph,
+                        configuration,
+                        userCodeClassLoader,
+                        checkpointRecoveryFactory,
+                        LOG);
+        this.checkpointIdCounter =
+                SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled(
+                        jobGraph, checkpointRecoveryFactory);
+        this.checkpointsCleaner = new CheckpointsCleaner();
+
+        this.slotAllocator =
+                new SlotSharingSlotAllocator(
+                        declarativeSlotPool::reserveFreeSlot,
+                        declarativeSlotPool::freeReservedSlot);
+
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) {
+                vertex.setParallelism(1);
+            }
+        }
+
+        declarativeSlotPool.registerNewSlotsListener(this::newResourcesAvailable);
+
+        this.componentMainThreadExecutor = mainThreadExecutor;
+        this.jobStatusListener = jobStatusListener;
+
+        this.scaleUpController = new ReactiveScaleUpController(configuration);
+    }
+
+    private static void ensureFullyPipelinedStreamingJob(JobGraph jobGraph)
+            throws RuntimeException {
+        Preconditions.checkState(
+                jobGraph.getJobType() == JobType.STREAMING,
+                "The declarative scheduler only supports streaming jobs.");
+        Preconditions.checkState(
+                jobGraph.getScheduleMode()
+                        != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+                "The declarative schedules does not support batch slot requests.");
+
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            for (JobEdge jobEdge : vertex.getInputs()) {
+                Preconditions.checkState(
+                        jobEdge.getSource().getResultType().isPipelined(),
+                        "The declarative scheduler supports pipelined data exchanges (violated by %s -> %s).",
+                        jobEdge.getSource().getProducer(),
+                        jobEdge.getTarget().getID());
+            }
+        }
+    }
+
+    private void newResourcesAvailable(Collection<? extends PhysicalSlot> physicalSlots) {
+        state.tryRun(
+                ResourceConsumer.class,
+                ResourceConsumer::notifyNewResourcesAvailable,
+                "newResourcesAvailable");
+    }
+
+    @Override
+    public void startScheduling() {
+        state.as(Created.class)
+                .orElseThrow(
+                        () ->
+                                new IllegalStateException(
+                                        "Can only start scheduling when being in Created state."))
+                .startScheduling();
+    }
+
+    @Override
+    public void suspend(Throwable cause) {
+        state.suspend(cause);
+    }
+
+    @Override
+    public void cancel() {
+        state.cancel();
+    }
+
+    @Override
+    public CompletableFuture<Void> getTerminationFuture() {
+        return terminationFuture;
+    }
+
+    @Override
+    public void handleGlobalFailure(Throwable cause) {
+        state.handleGlobalFailure(cause);
+    }
+
+    @Override
+    public boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState) {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                stateWithExecutionGraph.updateTaskExecutionState(
+                                        taskExecutionState),
+                        "updateTaskExecutionState")
+                .orElse(false);
+    }
+
+    @Override
+    public SerializedInputSplit requestNextInputSplit(
+            JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                stateWithExecutionGraph.requestNextInputSplit(
+                                        vertexID, executionAttempt),
+                        "requestNextInputSplit")
+                .orElseThrow(
+                        () -> new IOException("Scheduler is currently not executing the job."));
+    }
+
+    @Override
+    public ExecutionState requestPartitionState(
+            IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId)
+            throws PartitionProducerDisposedException {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                stateWithExecutionGraph.requestPartitionState(
+                                        intermediateResultId, resultPartitionId),
+                        "requestPartitionState")
+                .orElseThrow(() -> new PartitionProducerDisposedException(resultPartitionId));
+    }
+
+    @Override
+    public void notifyPartitionDataAvailable(ResultPartitionID partitionID) {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        stateWithExecutionGraph.notifyPartitionDataAvailable(partitionID),
+                "notifyPartitionDataAvailable");
+    }
+
+    @Override
+    public ArchivedExecutionGraph requestJob() {
+        return state.getJob();
+    }
+
+    @Override
+    public JobStatus requestJobStatus() {
+        return state.getJobStatus();
+    }
+
+    @Override
+    public JobDetails requestJobDetails() {
+        return JobDetails.createDetailsForJob(state.getJob());
+    }
+
+    @Override
+    public KvStateLocation requestKvStateLocation(JobID jobId, String registrationName)
+            throws UnknownKvStateLocation, FlinkJobNotFoundException {
+        final Optional<StateWithExecutionGraph> asOptional =
+                state.as(StateWithExecutionGraph.class);
+
+        if (asOptional.isPresent()) {
+            return asOptional.get().requestKvStateLocation(jobId, registrationName);
+        } else {
+            throw new UnknownKvStateLocation(registrationName);
+        }
+    }
+
+    @Override
+    public void notifyKvStateRegistered(
+            JobID jobId,
+            JobVertexID jobVertexId,
+            KeyGroupRange keyGroupRange,
+            String registrationName,
+            KvStateID kvStateId,
+            InetSocketAddress kvStateServerAddress)
+            throws FlinkJobNotFoundException {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        stateWithExecutionGraph.notifyKvStateRegistered(
+                                jobId,
+                                jobVertexId,
+                                keyGroupRange,
+                                registrationName,
+                                kvStateId,
+                                kvStateServerAddress),
+                "notifyKvStateRegistered");
+    }
+
+    @Override
+    public void notifyKvStateUnregistered(
+            JobID jobId,
+            JobVertexID jobVertexId,
+            KeyGroupRange keyGroupRange,
+            String registrationName)
+            throws FlinkJobNotFoundException {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        stateWithExecutionGraph.notifyKvStateUnregistered(
+                                jobId, jobVertexId, keyGroupRange, registrationName),
+                "notifyKvStateUnregistered");
+    }
+
+    @Override
+    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        stateWithExecutionGraph.updateAccumulators(accumulatorSnapshot),
+                "updateAccumulators");
+    }
+
+    @Override
+    public CompletableFuture<String> triggerSavepoint(
+            @Nullable String targetDirectory, boolean cancelJob) {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                stateWithExecutionGraph.triggerSavepoint(
+                                        targetDirectory, cancelJob),
+                        "triggerSavepoint")
+                .orElse(
+                        FutureUtils.completedExceptionally(
+                                new CheckpointException(
+                                        "The Flink job is currently not executing.",
+                                        CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE)));
+    }
+
+    @Override
+    public void acknowledgeCheckpoint(
+            JobID jobID,
+            ExecutionAttemptID executionAttemptID,
+            long checkpointId,
+            CheckpointMetrics checkpointMetrics,
+            TaskStateSnapshot checkpointState) {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        stateWithExecutionGraph.acknowledgeCheckpoint(
+                                jobID,
+                                executionAttemptID,
+                                checkpointId,
+                                checkpointMetrics,
+                                checkpointState),
+                "acknowledgeCheckpoint");
+    }
+
+    @Override
+    public void reportCheckpointMetrics(
+            JobID jobID,
+            ExecutionAttemptID executionAttemptID,
+            long checkpointId,
+            CheckpointMetrics checkpointMetrics) {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        stateWithExecutionGraph.reportCheckpointMetrics(
+                                executionAttemptID, checkpointId, checkpointMetrics),
+                "reportCheckpointMetrics");
+    }
+
+    @Override
+    public void declineCheckpoint(DeclineCheckpoint decline) {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph -> stateWithExecutionGraph.declineCheckpoint(decline),
+                "declineCheckpoint");
+    }
+
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(
+            String targetDirectory, boolean advanceToEndOfEventTime) {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                stateWithExecutionGraph.stopWithSavepoint(
+                                        targetDirectory, advanceToEndOfEventTime),
+                        "stopWithSavepoint")
+                .orElse(
+                        FutureUtils.completedExceptionally(
+                                new CheckpointException(
+                                        "The Flink job is currently not executing.",
+                                        CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE)));
+    }
+
+    @Override
+    public void deliverOperatorEventToCoordinator(
+            ExecutionAttemptID taskExecution, OperatorID operator, OperatorEvent evt)
+            throws FlinkException {
+        final StateWithExecutionGraph stateWithExecutionGraph =
+                state.as(StateWithExecutionGraph.class)
+                        .orElseThrow(
+                                () ->
+                                        new TaskNotRunningException(
+                                                "Task is not known or in state running on the JobManager."));
+
+        stateWithExecutionGraph.deliverOperatorEventToCoordinator(taskExecution, operator, evt);
+    }
+
+    @Override
+    public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
+            OperatorID operator, CoordinationRequest request) throws FlinkException {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                stateWithExecutionGraph.deliverCoordinationRequestToCoordinator(
+                                        operator, request),
+                        "deliverCoordinationRequestToCoordinator")
+                .orElseGet(
+                        () ->
+                                FutureUtils.completedExceptionally(
+                                        new FlinkException(
+                                                "Coordinator of operator "
+                                                        + operator
+                                                        + " does not exist")));
+    }
+
+    // ----------------------------------------------------------------
+
+    @Override
+    public boolean hasEnoughResources(ResourceCounter desiredResources) {
+        final Collection<? extends SlotInfo> allSlots =
+                declarativeSlotPool.getFreeSlotsInformation();
+        ResourceCounter outstandingResources = desiredResources;
+
+        final Iterator<? extends SlotInfo> slotIterator = allSlots.iterator();
+
+        while (!outstandingResources.isEmpty() && slotIterator.hasNext()) {
+            final SlotInfo slotInfo = slotIterator.next();
+            final ResourceProfile resourceProfile = slotInfo.getResourceProfile();
+
+            if (outstandingResources.containsResource(resourceProfile)) {
+                outstandingResources = outstandingResources.subtract(resourceProfile, 1);
+            } else {
+                outstandingResources = outstandingResources.subtract(ResourceProfile.UNKNOWN, 1);
+            }
+        }
+
+        return outstandingResources.isEmpty();
+    }
+
+    private <T extends VertexParallelism>
+            ParallelismAndResourceAssignments determineParallelismAndAssignResources(
+                    SlotAllocator<T> slotAllocator) throws JobExecutionException {
+
+        final T vertexParallelism =
+                slotAllocator
+                        .determineParallelism(
+                                jobInformation, declarativeSlotPool.getFreeSlotsInformation())
+                        .orElseThrow(
+                                () ->
+                                        new JobExecutionException(
+                                                jobInformation.getJobID(),
+                                                "Not enough resources available for scheduling."));
+
+        final Map<ExecutionVertexID, LogicalSlot> slotAssignments =
+                slotAllocator.reserveResources(vertexParallelism);
+
+        return new ParallelismAndResourceAssignments(
+                slotAssignments, vertexParallelism.getMaxParallelismForVertices());
+    }
+
+    @Override
+    public ExecutionGraph createExecutionGraphWithAvailableResources() throws Exception {
+        final ParallelismAndResourceAssignments parallelismAndResourceAssignments =
+                determineParallelismAndAssignResources(slotAllocator);
+
+        JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
+        for (JobVertex vertex : adjustedJobGraph.getVertices()) {
+            vertex.setParallelism(parallelismAndResourceAssignments.getParallelism(vertex.getID()));
+        }
+
+        final ExecutionGraph executionGraph = createExecutionGraphAndRestoreState(adjustedJobGraph);
+
+        executionGraph.start(componentMainThreadExecutor);
+        executionGraph.transitionToRunning();
+
+        executionGraph.setInternalTaskFailuresListener(
+                new UpdateSchedulerNgOnInternalFailuresListener(this, jobInformation.getJobID()));
+
+        for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
+            final LogicalSlot assignedSlot =
+                    parallelismAndResourceAssignments.getAssignedSlot(executionVertex.getID());
+            executionVertex
+                    .getCurrentExecutionAttempt()
+                    .registerProducedPartitions(assignedSlot.getTaskManagerLocation(), false);
+            executionVertex.tryAssignResource(assignedSlot);
+        }
+        return executionGraph;
+    }
+
+    private ExecutionGraph createExecutionGraphAndRestoreState(JobGraph adjustedJobGraph)
+            throws Exception {
+        ExecutionDeploymentListener executionDeploymentListener =
+                new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
+        ExecutionStateUpdateListener executionStateUpdateListener =
+                (execution, newState) -> {
+                    if (newState.isTerminal()) {
+                        executionDeploymentTracker.stopTrackingDeploymentOf(execution);
+                    }
+                };
+
+        final ExecutionGraph newExecutionGraph =
+                ExecutionGraphBuilder.buildGraph(
+                        adjustedJobGraph,
+                        configuration,
+                        futureExecutor,
+                        ioExecutor,
+                        userCodeClassLoader,
+                        completedCheckpointStore,
+                        checkpointsCleaner,
+                        checkpointIdCounter,
+                        rpcTimeout,
+                        jobManagerJobMetricGroup,
+                        blobWriter,
+                        LOG,
+                        shuffleMaster,
+                        partitionTracker,
+                        executionDeploymentListener,
+                        executionStateUpdateListener,
+                        initializationTimestamp);
+
+        final CheckpointCoordinator checkpointCoordinator =
+                newExecutionGraph.getCheckpointCoordinator();
+
+        if (checkpointCoordinator != null) {
+            // check whether we find a valid checkpoint
+            if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
+                    new HashSet<>(newExecutionGraph.getAllVertices().values()))) {
+
+                // check whether we can restore from a savepoint
+                tryRestoreExecutionGraphFromSavepoint(
+                        newExecutionGraph, adjustedJobGraph.getSavepointRestoreSettings());
+            }
+        }
+
+        return newExecutionGraph;
+    }
+
+    /**
+     * Tries to restore the given {@link ExecutionGraph} from the provided {@link
+     * SavepointRestoreSettings}, iff checkpointing is enabled.
+     *
+     * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored
+     * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about
+     *     the savepoint to restore from
+     * @throws Exception if the {@link ExecutionGraph} could not be restored
+     */
+    private void tryRestoreExecutionGraphFromSavepoint(
+            ExecutionGraph executionGraphToRestore,
+            SavepointRestoreSettings savepointRestoreSettings)
+            throws Exception {
+        if (savepointRestoreSettings.restoreSavepoint()) {
+            final CheckpointCoordinator checkpointCoordinator =
+                    executionGraphToRestore.getCheckpointCoordinator();
+            if (checkpointCoordinator != null) {
+                checkpointCoordinator.restoreSavepoint(
+                        savepointRestoreSettings.getRestorePath(),
+                        savepointRestoreSettings.allowNonRestoredState(),
+                        executionGraphToRestore.getAllVertices(),
+                        userCodeClassLoader);
+            }
+        }
+    }
+
+    @Override
+    public ArchivedExecutionGraph getArchivedExecutionGraph(
+            JobStatus jobStatus, @Nullable Throwable cause) {
+        return ArchivedExecutionGraph.createFromInitializingJob(
+                jobInformation.getJobID(),
+                jobInformation.getName(),
+                jobStatus,
+                cause,
+                initializationTimestamp);
+    }
+
+    @Override
+    public void goToWaitingForResources() {
+        final ResourceCounter desiredResources = calculateDesiredResources();
+        declarativeSlotPool.setResourceRequirements(desiredResources);
+
+        // TODO: add resourceTimeout parameter
+        transitionToState(
+                new WaitingForResources(this, LOG, desiredResources, Duration.ofSeconds(10)));
+    }
+
+    private ResourceCounter calculateDesiredResources() {
+        return slotAllocator.calculateRequiredSlots(jobInformation.getVertices());
+    }
+
+    @Override
+    public void goToExecuting(ExecutionGraph executionGraph) {
+        final ExecutionGraphHandler executionGraphHandler =
+                new ExecutionGraphHandler(
+                        executionGraph, LOG, ioExecutor, componentMainThreadExecutor);
+        final OperatorCoordinatorHandler operatorCoordinatorHandler =
+                new OperatorCoordinatorHandler(executionGraph, this::handleGlobalFailure);
+        operatorCoordinatorHandler.initializeOperatorCoordinators(componentMainThreadExecutor);
+        operatorCoordinatorHandler.startAllOperatorCoordinators();
+
+        transitionToState(
+                new Executing(
+                        executionGraph,
+                        executionGraphHandler,
+                        operatorCoordinatorHandler,
+                        LOG,
+                        this,
+                        userCodeClassLoader));
+    }
+
+    @Override
+    public void goToCanceling(
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler) {
+        transitionToState(
+                new Canceling(
+                        this,
+                        executionGraph,
+                        executionGraphHandler,
+                        operatorCoordinatorHandler,
+                        LOG));
+    }
+
+    @Override
+    public void goToRestarting(
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Duration backoffTime) {
+        transitionToState(
+                new Restarting(
+                        this,
+                        executionGraph,
+                        executionGraphHandler,
+                        operatorCoordinatorHandler,
+                        LOG,
+                        backoffTime));
+    }
+
+    @Override
+    public void goToFailing(
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Throwable failureCause) {
+        transitionToState(
+                new Failing(
+                        this,
+                        executionGraph,
+                        executionGraphHandler,
+                        operatorCoordinatorHandler,
+                        LOG,
+                        failureCause));
+    }
+
+    @Override
+    public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) {
+        transitionToState(new Finished(this, archivedExecutionGraph, LOG));
+    }
+
+    @Override
+    public boolean canScaleUp(ExecutionGraph executionGraph) {
+        int availableSlots = declarativeSlotPool.getFreeSlotsInformation().size();
+
+        if (availableSlots > 0) {
+            final Optional<? extends VertexParallelism> potentialNewParallelism =
+                    slotAllocator.determineParallelism(
+                            jobInformation, declarativeSlotPool.getAllSlotsInformation());
+
+            if (potentialNewParallelism.isPresent()) {
+                int currentCumulativeParallelism = getCurrentCumulativeParallelism(executionGraph);
+                int newCumulativeParallelism =
+                        getCumulativeParallelism(potentialNewParallelism.get());
+                if (newCumulativeParallelism > currentCumulativeParallelism) {
+                    LOG.debug(
+                            "Offering scale up to scale up controller with currentCumulativeParallelism={}, newCumulativeParallelism={}",
+                            currentCumulativeParallelism,
+                            newCumulativeParallelism);
+                    return scaleUpController.canScaleUp(
+                            currentCumulativeParallelism, newCumulativeParallelism);
+                }
+            }
+        }
+        return false;
+    }
+
+    private static int getCurrentCumulativeParallelism(ExecutionGraph executionGraph) {
+        return executionGraph.getAllVertices().values().stream()
+                .map(ExecutionJobVertex::getParallelism)
+                .reduce(0, Integer::sum);
+    }
+
+    private static int getCumulativeParallelism(VertexParallelism potentialNewParallelism) {
+        return potentialNewParallelism.getMaxParallelismForVertices().values().stream()
+                .reduce(0, Integer::sum);
+    }
+
+    @Override
+    public void onFinished(ArchivedExecutionGraph archivedExecutionGraph) {
+        stopCheckpointServicesSafely(archivedExecutionGraph.getState());
+
+        if (jobStatusListener != null) {
+            jobStatusListener.jobStatusChanges(
+                    jobInformation.getJobID(),
+                    archivedExecutionGraph.getState(),
+                    archivedExecutionGraph.getStatusTimestamp(archivedExecutionGraph.getState()),
+                    archivedExecutionGraph.getFailureInfo() != null
+                            ? archivedExecutionGraph.getFailureInfo().getException()
+                            : null);
+        }
+    }
+
+    private void stopCheckpointServicesSafely(JobStatus terminalState) {
+        Exception exception = null;
+
+        try {
+            completedCheckpointStore.shutdown(terminalState, checkpointsCleaner);
+        } catch (Exception e) {
+            exception = e;
+        }
+
+        try {
+            checkpointIdCounter.shutdown(terminalState);
+        } catch (Exception e) {
+            exception = ExceptionUtils.firstOrSuppressed(e, exception);
+        }
+
+        if (exception != null) {
+            LOG.warn("Failed to stop checkpoint services.", exception);
+        }
+    }
+
+    @Override
+    public Executing.FailureResult howToHandleFailure(Throwable failure) {
+        if (ExecutionFailureHandler.isUnrecoverableError(failure)) {
+            return Executing.FailureResult.canNotRestart(
+                    new JobException("The failure is not recoverable", failure));
+        }
+
+        restartBackoffTimeStrategy.notifyFailure(failure);
+        if (restartBackoffTimeStrategy.canRestart()) {
+            return Executing.FailureResult.canRestart(
+                    Duration.ofMillis(restartBackoffTimeStrategy.getBackoffTime()));
+        } else {
+            return Executing.FailureResult.canNotRestart(
+                    new JobException(
+                            "Recovery is suppressed by " + restartBackoffTimeStrategy, failure));
+        }
+    }
+
+    @Override
+    public Executor getMainThreadExecutor() {
+        return componentMainThreadExecutor;
+    }
+
+    @Override
+    public boolean isState(State expectedState) {
+        return expectedState == this.state;
+    }
+
+    @Override
+    public void runIfState(State expectedState, Runnable action) {
+        if (isState(expectedState)) {
+            try {
+                action.run();
+            } catch (Throwable t) {
+                fatalErrorHandler.onFatalError(t);
+            }
+        } else {
+            LOG.debug(
+                    "Ignoring scheduled action because expected state {} is not the actual state {}.",
+                    expectedState,
+                    state);
+        }
+    }
+
+    @Override
+    public void runIfState(State expectedState, Runnable action, Duration delay) {
+        componentMainThreadExecutor.schedule(
+                () -> runIfState(expectedState, action), delay.toMillis(), TimeUnit.MILLISECONDS);
+    }
+
+    // ----------------------------------------------------------------
+
+    @VisibleForTesting
+    void transitionToState(State newState) {
+        if (state != newState) {
+            LOG.debug(
+                    "Transition from state {} to {}.",
+                    state.getClass().getSimpleName(),
+                    newState.getClass().getSimpleName());
+
+            State oldState = state;
+            oldState.onLeave(newState.getClass());
+
+            state = newState;
+            newState.onEnter();
+        }
+    }
+
+    @VisibleForTesting
+    State getState() {
+        return state;
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerFactory.java
new file mode 100644
index 0000000..5e7d630
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+/** Factory for the declarative scheduler. */
+public class DeclarativeSchedulerFactory implements SchedulerNGFactory {
+    @Override
+    public SchedulerNG createInstance(
+            Logger log,
+            JobGraph jobGraph,
+            Executor ioExecutor,
+            Configuration jobMasterConfiguration,
+            SlotPoolService slotPoolService,
+            ScheduledExecutorService futureExecutor,
+            ClassLoader userCodeLoader,
+            CheckpointRecoveryFactory checkpointRecoveryFactory,
+            Time rpcTimeout,
+            BlobWriter blobWriter,
+            JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            Time slotRequestTimeout,
+            ShuffleMaster<?> shuffleMaster,
+            JobMasterPartitionTracker partitionTracker,
+            ExecutionDeploymentTracker executionDeploymentTracker,
+            long initializationTimestamp,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            FatalErrorHandler fatalErrorHandler,
+            JobStatusListener jobStatusListener)
+            throws Exception {
+        final DeclarativeSlotPool declarativeSlotPool =
+                slotPoolService
+                        .castInto(DeclarativeSlotPool.class)
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "The DeclarativeScheduler requires a DeclarativeSlotPool."));
+        final RestartBackoffTimeStrategy restartBackoffTimeStrategy =
+                RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(
+                                jobGraph.getSerializedExecutionConfig()
+                                        .deserializeValue(userCodeLoader)
+                                        .getRestartStrategy(),
+                                jobMasterConfiguration,
+                                jobGraph.isCheckpointingEnabled())
+                        .create();
+        log.info(
+                "Using restart back off time strategy {} for {} ({}).",
+                restartBackoffTimeStrategy,
+                jobGraph.getName(),
+                jobGraph.getJobID());
+
+        return new DeclarativeScheduler(
+                jobGraph,
+                jobMasterConfiguration,
+                declarativeSlotPool,
+                futureExecutor,
+                ioExecutor,
+                userCodeLoader,
+                checkpointRecoveryFactory,
+                rpcTimeout,
+                blobWriter,
+                jobManagerJobMetricGroup,
+                shuffleMaster,
+                partitionTracker,
+                restartBackoffTimeStrategy,
+                executionDeploymentTracker,
+                initializationTimestamp,
+                mainThreadExecutor,
+                fatalErrorHandler,
+                jobStatusListener);
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/JobGraphJobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/JobGraphJobInformation.java
new file mode 100644
index 0000000..8139321
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/JobGraphJobInformation.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.declarative.allocator.JobInformation;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/** {@link JobInformation} created from a {@link JobGraph}. */
+public class JobGraphJobInformation implements JobInformation {
+
+    private final JobGraph jobGraph;
+    private final JobID jobID;
+    private final String name;
+
+    public JobGraphJobInformation(JobGraph jobGraph) {
+        this.jobGraph = jobGraph;
+        this.jobID = jobGraph.getJobID();
+        this.name = jobGraph.getName();
+    }
+
+    @Override
+    public Collection<SlotSharingGroup> getSlotSharingGroups() {
+        return jobGraph.getSlotSharingGroups();
+    }
+
+    @Override
+    public JobInformation.VertexInformation getVertexInformation(JobVertexID jobVertexId) {
+        return new JobVertexInformation(jobGraph.findVertexByID(jobVertexId));
+    }
+
+    public JobID getJobID() {
+        return jobID;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public Iterable<JobInformation.VertexInformation> getVertices() {
+        return jobGraphVerticesToVertexInformation(jobGraph.getVertices());
+    }
+
+    public static Iterable<JobInformation.VertexInformation> jobGraphVerticesToVertexInformation(
+            Iterable<JobVertex> verticesIterable) {
+        return Iterables.transform(verticesIterable, JobVertexInformation::new);
+    }
+
+    /** Returns a copy of a jobGraph that can be mutated. */
+    public JobGraph copyJobGraph() throws IOException, ClassNotFoundException {
+        return InstantiationUtil.clone(jobGraph);
+    }
+
+    private static final class JobVertexInformation implements JobInformation.VertexInformation {
+
+        private final JobVertex jobVertex;
+
+        private JobVertexInformation(JobVertex jobVertex) {
+            this.jobVertex = jobVertex;
+        }
+
+        @Override
+        public JobVertexID getJobVertexID() {
+            return jobVertex.getID();
+        }
+
+        @Override
+        public int getParallelism() {
+            return jobVertex.getParallelism();
+        }
+
+        @Override
+        public SlotSharingGroup getSlotSharingGroup() {
+            return jobVertex.getSlotSharingGroup();
+        }
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/ParallelismAndResourceAssignments.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/ParallelismAndResourceAssignments.java
new file mode 100644
index 0000000..f2527ae
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/ParallelismAndResourceAssignments.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+
+/** Assignment of slots to execution vertices. */
+public final class ParallelismAndResourceAssignments {
+    private final Map<ExecutionVertexID, ? extends LogicalSlot> assignedSlots;
+
+    private final Map<JobVertexID, Integer> parallelismPerJobVertex;
+
+    public ParallelismAndResourceAssignments(
+            Map<ExecutionVertexID, ? extends LogicalSlot> assignedSlots,
+            Map<JobVertexID, Integer> parallelismPerJobVertex) {
+        this.assignedSlots = assignedSlots;
+        this.parallelismPerJobVertex = parallelismPerJobVertex;
+    }
+
+    public int getParallelism(JobVertexID jobVertexId) {
+        Preconditions.checkState(parallelismPerJobVertex.containsKey(jobVertexId));
+        return parallelismPerJobVertex.get(jobVertexId);
+    }
+
+    public LogicalSlot getAssignedSlot(ExecutionVertexID executionVertexId) {
+        Preconditions.checkState(assignedSlots.containsKey(executionVertexId));
+        return assignedSlots.get(executionVertexId);
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java
index f9c9015..a44fe2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java
@@ -24,6 +24,14 @@ import java.util.Collection;
 
 /** Information about the job. */
 public interface JobInformation {
+    /**
+     * Returns all slot-sharing groups of the job.
+     *
+     * <p>Attention: The returned slot sharing groups should never be modified (they are indeed
+     * mutable)!
+     *
+     * @return all slot-sharing groups of the job
+     */
     Collection<SlotSharingGroup> getSlotSharingGroups();
 
     VertexInformation getVertexInformation(JobVertexID jobVertexId);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
index 5b41bad..2dce418 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
@@ -39,9 +39,9 @@ import java.util.Map;
  *
  * <p>The release process of a shared slot follows one of 2 code paths:
  *
- * <p>1)During normal execution all allocated logical slots will be returned, with the last return
+ * <p>1) During normal execution all allocated logical slots will be returned, with the last return
  * triggering the {@code externalReleaseCallback} which must eventually result in a {@link
- * #release(Throwable)} call. 2)
+ * #release(Throwable)} call.
  *
  * <p>2) If the backing physical is lost (e.g., because the providing TaskManager crashed) then
  * {@link #release(Throwable)} is called without all logical slots having been returned. The runtime
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
index 2ef0e47..810666a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
@@ -21,12 +21,15 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
 import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import org.apache.flink.runtime.scheduler.declarative.DeclarativeSchedulerFactory;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -65,7 +68,18 @@ public class SchedulerNGFactoryFactoryTest extends TestLogger {
         }
     }
 
+    @Test
+    public void fallBackToNonDeclarativeSchedulerForBatchJobsIfDeclarativeIsConfigured() {
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Declarative);
+
+        final SchedulerNGFactory schedulerNGFactory =
+                SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration, JobType.BATCH);
+
+        assertThat(schedulerNGFactory, is(not(instanceOf(DeclarativeSchedulerFactory.class))));
+    }
+
     private static SchedulerNGFactory createSchedulerNGFactory(final Configuration configuration) {
-        return SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
+        return SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration, JobType.BATCH);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index d0d25bc..3270119 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -292,7 +292,8 @@ public class JobMasterTest extends TestLogger {
                     JobMasterConfiguration.fromConfiguration(configuration);
 
             final SchedulerNGFactory schedulerNGFactory =
-                    SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
+                    SchedulerNGFactoryFactory.createSchedulerNGFactory(
+                            configuration, jobGraph.getJobType());
 
             final JobMaster jobMaster =
                     new JobMaster(
@@ -302,7 +303,8 @@ public class JobMasterTest extends TestLogger {
                             jmResourceId,
                             jobGraph,
                             haServices,
-                            SlotPoolServiceFactory.fromConfiguration(configuration),
+                            SlotPoolServiceFactory.fromConfiguration(
+                                    configuration, jobGraph.getJobType()),
                             jobManagerSharedServices,
                             heartbeatServices,
                             UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
index d97778e..bb46435 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
@@ -553,7 +553,7 @@ public class DefaultDeclarativeSlotPoolTest extends TestLogger {
     }
 
     @Nonnull
-    private static Collection<SlotOffer> createSlotOffersForResourceRequirements(
+    public static Collection<SlotOffer> createSlotOffersForResourceRequirements(
             ResourceCounter resourceRequirements) {
         Collection<SlotOffer> slotOffers = new ArrayList<>();
         int slotIndex = 0;
@@ -608,7 +608,7 @@ public class DefaultDeclarativeSlotPoolTest extends TestLogger {
     }
 
     @Nonnull
-    static Collection<SlotOffer> offerSlots(
+    public static Collection<SlotOffer> offerSlots(
             DeclarativeSlotPool slotPool, Collection<SlotOffer> slotOffers) {
         return slotPool.offerSlots(
                 slotOffers, new LocalTaskManagerLocation(), createTaskManagerGateway(null), 0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
index 26bbc09..c5c10ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
@@ -195,7 +195,8 @@ public class JobMasterBuilder {
                 highAvailabilityServices,
                 slotPoolFactory != null
                         ? slotPoolFactory
-                        : SlotPoolServiceFactory.fromConfiguration(configuration),
+                        : SlotPoolServiceFactory.fromConfiguration(
+                                configuration, jobGraph.getJobType()),
                 jobManagerSharedServices,
                 heartbeatServices,
                 UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
@@ -204,7 +205,8 @@ public class JobMasterBuilder {
                 JobMasterBuilder.class.getClassLoader(),
                 schedulerFactory != null
                         ? schedulerFactory
-                        : SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration),
+                        : SchedulerNGFactoryFactory.createSchedulerNGFactory(
+                                configuration, jobGraph.getJobType()),
                 shuffleMaster,
                 partitionTrackerFactory,
                 executionDeploymentTracker,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerBuilder.java
new file mode 100644
index 0000000..192db57
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerBuilder.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+/** Builder for {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerBuilder {
+    private static final Time DEFAULT_TIMEOUT = Time.seconds(300);
+
+    private final JobGraph jobGraph;
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    private Executor ioExecutor = TestingUtils.defaultExecutor();
+    private Configuration jobMasterConfiguration = new Configuration();
+    private ScheduledExecutorService futureExecutor = TestingUtils.defaultExecutor();
+    private ClassLoader userCodeLoader = ClassLoader.getSystemClassLoader();
+    private CheckpointRecoveryFactory checkpointRecoveryFactory =
+            new StandaloneCheckpointRecoveryFactory();
+    private DeclarativeSlotPool declarativeSlotPool;
+    private Time rpcTimeout = DEFAULT_TIMEOUT;
+    private BlobWriter blobWriter = VoidBlobWriter.getInstance();
+    private JobManagerJobMetricGroup jobManagerJobMetricGroup =
+            UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
+    private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE;
+    private JobMasterPartitionTracker partitionTracker = NoOpJobMasterPartitionTracker.INSTANCE;
+    private RestartBackoffTimeStrategy restartBackoffTimeStrategy =
+            NoRestartBackoffTimeStrategy.INSTANCE;
+    private FatalErrorHandler fatalErrorHandler =
+            error ->
+                    FatalExitExceptionHandler.INSTANCE.uncaughtException(
+                            Thread.currentThread(), error);
+    private JobStatusListener jobStatusListener = (ignoredA, ignoredB, ignoredC, ignoredD) -> {};
+
+    public DeclarativeSchedulerBuilder(
+            final JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor) {
+        this.jobGraph = jobGraph;
+        this.mainThreadExecutor = mainThreadExecutor;
+
+        this.declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        DEFAULT_TIMEOUT,
+                        rpcTimeout);
+    }
+
+    public DeclarativeSchedulerBuilder setIoExecutor(final Executor ioExecutor) {
+        this.ioExecutor = ioExecutor;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setJobMasterConfiguration(
+            final Configuration jobMasterConfiguration) {
+        this.jobMasterConfiguration = jobMasterConfiguration;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setFutureExecutor(
+            final ScheduledExecutorService futureExecutor) {
+        this.futureExecutor = futureExecutor;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setUserCodeLoader(final ClassLoader userCodeLoader) {
+        this.userCodeLoader = userCodeLoader;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setCheckpointRecoveryFactory(
+            final CheckpointRecoveryFactory checkpointRecoveryFactory) {
+        this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setRpcTimeout(final Time rpcTimeout) {
+        this.rpcTimeout = rpcTimeout;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setBlobWriter(final BlobWriter blobWriter) {
+        this.blobWriter = blobWriter;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setJobManagerJobMetricGroup(
+            final JobManagerJobMetricGroup jobManagerJobMetricGroup) {
+        this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setShuffleMaster(final ShuffleMaster<?> shuffleMaster) {
+        this.shuffleMaster = shuffleMaster;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setPartitionTracker(
+            final JobMasterPartitionTracker partitionTracker) {
+        this.partitionTracker = partitionTracker;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setDeclarativeSlotPool(
+            DeclarativeSlotPool declarativeSlotPool) {
+        this.declarativeSlotPool = declarativeSlotPool;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setRestartBackoffTimeStrategy(
+            final RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+        this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setFatalErrorHandler(FatalErrorHandler fatalErrorHandler) {
+        this.fatalErrorHandler = fatalErrorHandler;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setJobStatusListener(JobStatusListener jobStatusListener) {
+        this.jobStatusListener = jobStatusListener;
+        return this;
+    }
+
+    public DeclarativeScheduler build() throws Exception {
+        return new DeclarativeScheduler(
+                jobGraph,
+                jobMasterConfiguration,
+                declarativeSlotPool,
+                futureExecutor,
+                ioExecutor,
+                userCodeLoader,
+                checkpointRecoveryFactory,
+                rpcTimeout,
+                blobWriter,
+                jobManagerJobMetricGroup,
+                shuffleMaster,
+                partitionTracker,
+                restartBackoffTimeStrategy,
+                new DefaultExecutionDeploymentTracker(),
+                System.currentTimeMillis(),
+                mainThreadExecutor,
+                fatalErrorHandler,
+                jobStatusListener);
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerClusterITCase.java
new file mode 100644
index 0000000..f7e528d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerClusterITCase.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.OnceBlockingNoOpInvokable;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * This class contains integration tests for the declarative scheduler which start a {@link
+ * org.apache.flink.runtime.minicluster.MiniCluster} per test case.
+ */
+public class DeclarativeSchedulerClusterITCase extends TestLogger {
+
+    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+    private static final int NUMBER_TASK_MANAGERS = 2;
+    private static final int PARALLELISM = NUMBER_SLOTS_PER_TASK_MANAGER * NUMBER_TASK_MANAGERS;
+
+    private final Configuration configuration = createConfiguration();
+
+    @Rule
+    public final MiniClusterResource miniClusterResource =
+            new MiniClusterResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(configuration)
+                            .setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+                            .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+                            .build());
+
+    private Configuration createConfiguration() {
+        final Configuration configuration = new Configuration();
+
+        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Declarative);
+        configuration.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+        return configuration;
+    }
+
+    @Test
+    public void testAutomaticScaleDownInCaseOfLostSlots() throws InterruptedException, IOException {
+        assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM);
+
+        miniCluster.submitJob(jobGraph).join();
+        final CompletableFuture<JobResult> resultFuture =
+                miniCluster.requestJobResult(jobGraph.getJobID());
+
+        OnceBlockingNoOpInvokable.waitUntilOpsAreRunning();
+
+        miniCluster.terminateTaskManager(0);
+
+        final JobResult jobResult = resultFuture.join();
+
+        assertTrue(jobResult.isSuccess());
+    }
+
+    @Test
+    public void testAutomaticScaleUp() throws Exception {
+        assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        int targetInstanceCount = NUMBER_SLOTS_PER_TASK_MANAGER * (NUMBER_TASK_MANAGERS + 1);
+        final JobGraph jobGraph = createBlockingJobGraph(targetInstanceCount);
+
+        // initially only expect NUMBER_TASK_MANAGERS
+        OnceBlockingNoOpInvokable.resetFor(NUMBER_SLOTS_PER_TASK_MANAGER * NUMBER_TASK_MANAGERS);
+
+        log.info(
+                "Submitting job with parallelism of "
+                        + targetInstanceCount
+                        + ", to a cluster with only one TM.");
+        miniCluster.submitJob(jobGraph).join();
+        final CompletableFuture<JobResult> jobResultFuture =
+                miniCluster.requestJobResult(jobGraph.getJobID());
+
+        OnceBlockingNoOpInvokable.waitUntilOpsAreRunning();
+
+        log.info("Start additional TaskManager to scale up to the full parallelism.");
+        OnceBlockingNoOpInvokable.resetInstanceCount(); // we expect a restart
+        OnceBlockingNoOpInvokable.resetFor(targetInstanceCount);
+        miniCluster.startTaskManager();
+
+        log.info("Waiting until Invokable is running with higher parallelism");
+        OnceBlockingNoOpInvokable.waitUntilOpsAreRunning();
+
+        assertEquals(targetInstanceCount, OnceBlockingNoOpInvokable.getInstanceCount());
+
+        assertTrue(jobResultFuture.join().isSuccess());
+    }
+
+    private JobGraph createBlockingJobGraph(int parallelism) throws IOException {
+        final JobVertex blockingOperator = new JobVertex("Blocking operator");
+
+        OnceBlockingNoOpInvokable.resetFor(parallelism);
+        blockingOperator.setInvokableClass(OnceBlockingNoOpInvokable.class);
+
+        blockingOperator.setParallelism(parallelism);
+
+        final JobGraph jobGraph = new JobGraph("Blocking job.", blockingOperator);
+        jobGraph.setJobType(JobType.STREAMING);
+
+        ExecutionConfig executionConfig = new ExecutionConfig();
+        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
+        jobGraph.setExecutionConfig(executionConfig);
+
+        return jobGraph;
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSimpleITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSimpleITCase.java
new file mode 100644
index 0000000..290ae9c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSimpleITCase.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/** Integration tests for the declarative scheduler. */
+public class DeclarativeSchedulerSimpleITCase extends TestLogger {
+
+    private static final int NUMBER_TASK_MANAGERS = 2;
+    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+    private static final int PARALLELISM = 10;
+
+    private static final Configuration configuration = getConfiguration();
+
+    private static Configuration getConfiguration() {
+        final Configuration configuration = new Configuration();
+
+        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Declarative);
+        configuration.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+        return configuration;
+    }
+
+    @ClassRule
+    public static final MiniClusterResource MINI_CLUSTER_RESOURCE =
+            new MiniClusterResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(configuration)
+                            .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+                            .setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+                            .build());
+
+    @Test
+    public void testSchedulingOfSimpleJob() throws Exception {
+        assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+        final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
+        final JobGraph jobGraph = createJobGraph();
+
+        miniCluster.submitJob(jobGraph).join();
+
+        final JobResult jobResult = miniCluster.requestJobResult(jobGraph.getJobID()).join();
+
+        final JobExecutionResult jobExecutionResult =
+                jobResult.toJobExecutionResult(getClass().getClassLoader());
+
+        assertTrue(jobResult.isSuccess());
+    }
+
+    private JobGraph createJobGraph() {
+        final JobVertex source = new JobVertex("Source");
+        source.setInvokableClass(NoOpInvokable.class);
+        source.setParallelism(PARALLELISM);
+
+        final JobVertex sink = new JobVertex("sink");
+        sink.setInvokableClass(NoOpInvokable.class);
+        sink.setParallelism(PARALLELISM);
+
+        sink.connectNewDataSetAsInput(
+                source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+        return new JobGraph("Simple job", source, sink);
+    }
+
+    @Test
+    public void testGlobalFailoverIfTaskFails() throws IOException {
+        assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+        final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
+        final JobGraph jobGraph = createOnceFailingJobGraph();
+
+        miniCluster.submitJob(jobGraph).join();
+
+        final JobResult jobResult = miniCluster.requestJobResult(jobGraph.getJobID()).join();
+
+        assertTrue(jobResult.isSuccess());
+    }
+
+    private JobGraph createOnceFailingJobGraph() throws IOException {
+        final JobVertex onceFailingOperator = new JobVertex("Once failing operator");
+
+        OnceFailingInvokable.reset();
+        onceFailingOperator.setInvokableClass(OnceFailingInvokable.class);
+
+        onceFailingOperator.setParallelism(1);
+        final JobGraph jobGraph = new JobGraph("Once failing job", onceFailingOperator);
+        jobGraph.setJobType(JobType.STREAMING);
+        ExecutionConfig executionConfig = new ExecutionConfig();
+        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
+        jobGraph.setExecutionConfig(executionConfig);
+        return jobGraph;
+    }
+
+    /** Once failing {@link AbstractInvokable}. */
+    public static final class OnceFailingInvokable extends AbstractInvokable {
+        private static volatile boolean hasFailed = false;
+
+        /**
+         * Create an Invokable task and set its environment.
+         *
+         * @param environment The environment assigned to this invokable.
+         */
+        public OnceFailingInvokable(Environment environment) {
+            super(environment);
+        }
+
+        @Override
+        public void invoke() throws Exception {
+            if (!hasFailed && getIndexInSubtaskGroup() == 0) {
+                hasFailed = true;
+                throw new FlinkRuntimeException("Test failure.");
+            }
+        }
+
+        private static void reset() {
+            hasFailed = false;
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSlotSharingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSlotSharingITCase.java
new file mode 100644
index 0000000..06b993b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerSlotSharingITCase.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/** SlotSharing tests for the declarative scheduler. */
+public class DeclarativeSchedulerSlotSharingITCase extends TestLogger {
+
+    private static final int NUMBER_TASK_MANAGERS = 1;
+    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 1;
+    private static final int PARALLELISM = 10;
+
+    private static Configuration getConfiguration() {
+        final Configuration configuration = new Configuration();
+
+        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Declarative);
+        configuration.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+        return configuration;
+    }
+
+    @ClassRule
+    public static final MiniClusterResource MINI_CLUSTER_RESOURCE =
+            new MiniClusterResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(getConfiguration())
+                            .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+                            .setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+                            .build());
+
+    @Test
+    public void testSchedulingOfJobRequiringSlotSharing() throws Exception {
+        // run job multiple times to ensure slots are cleaned up properly
+        runJob();
+        runJob();
+    }
+
+    private void runJob() throws Exception {
+        final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
+        final JobGraph jobGraph = createJobGraph();
+
+        miniCluster.submitJob(jobGraph).join();
+
+        final JobResult jobResult = miniCluster.requestJobResult(jobGraph.getJobID()).join();
+
+        // this throws an exception if the job failed
+        jobResult.toJobExecutionResult(getClass().getClassLoader());
+
+        assertTrue(jobResult.isSuccess());
+    }
+
+    /**
+     * Returns a JobGraph that requires slot sharing to work in order to be able to run with a
+     * single slot.
+     */
+    private static JobGraph createJobGraph() {
+        final SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
+
+        final JobVertex source = new JobVertex("Source");
+        source.setInvokableClass(NoOpInvokable.class);
+        source.setParallelism(PARALLELISM);
+        source.setSlotSharingGroup(slotSharingGroup);
+
+        final JobVertex sink = new JobVertex("sink");
+        sink.setInvokableClass(NoOpInvokable.class);
+        sink.setParallelism(PARALLELISM);
+        sink.setSlotSharingGroup(slotSharingGroup);
+
+        sink.connectNewDataSetAsInput(
+                source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+        final JobGraph jobGraph = new JobGraph("Simple job", source, sink);
+        jobGraph.setJobType(JobType.STREAMING);
+
+        return jobGraph;
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
new file mode 100644
index 0000000..44dc879
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
+import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.offerSlots;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerTest extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+    private static final JobVertex JOB_VERTEX;
+
+    static {
+        JOB_VERTEX = new JobVertex("v1");
+        JOB_VERTEX.setParallelism(PARALLELISM);
+        JOB_VERTEX.setInvokableClass(AbstractInvokable.class);
+    }
+
+    private final ManuallyTriggeredComponentMainThreadExecutor mainThreadExecutor =
+            new ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
+
+    @Test
+    public void testInitialState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        assertThat(scheduler.getState(), instanceOf(Created.class));
+    }
+
+    @Test
+    public void testIsState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        final State state = scheduler.getState();
+
+        assertThat(scheduler.isState(state), is(true));
+        assertThat(scheduler.isState(new DummyState()), is(false));
+    }
+
+    @Test
+    public void testRunIfState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        AtomicBoolean ran = new AtomicBoolean(false);
+        scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
+        assertThat(ran.get(), is(true));
+    }
+
+    @Test
+    public void testRunIfStateWithStateMismatch() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        AtomicBoolean ran = new AtomicBoolean(false);
+        scheduler.runIfState(new DummyState(), () -> ran.set(true));
+        assertThat(ran.get(), is(false));
+    }
+
+    @Test
+    public void testHasEnoughResourcesReturnsFalseIfUnsatisfied() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        scheduler.startScheduling();
+
+        final ResourceCounter resourceRequirement =
+                ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
+
+        assertThat(scheduler.hasEnoughResources(resourceRequirement), is(false));
+    }
+
+    @Test
+    public void testHasEnoughResourcesReturnsTrueIfSatisfied() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        final ResourceCounter resourceRequirement =
+                ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
+
+        offerSlots(
+                declarativeSlotPool, createSlotOffersForResourceRequirements(resourceRequirement));
+
+        assertThat(scheduler.hasEnoughResources(resourceRequirement), is(true));
+    }
+
+    @Test
+    public void testHasEnoughResourcesUsesUnmatchedSlotsAsUnknown() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        final int numRequiredSlots = 1;
+        final ResourceCounter requiredResources =
+                ResourceCounter.withResource(ResourceProfile.UNKNOWN, numRequiredSlots);
+        final ResourceCounter providedResources =
+                ResourceCounter.withResource(
+                        ResourceProfile.newBuilder().setCpuCores(1).build(), numRequiredSlots);
+
+        offerSlots(declarativeSlotPool, createSlotOffersForResourceRequirements(providedResources));
+
+        assertThat(scheduler.hasEnoughResources(requiredResources), is(true));
+    }
+
+    @Test
+    public void testExecutionGraphGenerationWithAvailableResources() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        final int numAvailableSlots = 1;
+
+        offerSlots(
+                declarativeSlotPool,
+                createSlotOffersForResourceRequirements(
+                        ResourceCounter.withResource(ResourceProfile.UNKNOWN, numAvailableSlots)));
+
+        final ExecutionGraph executionGraph =
+                scheduler.createExecutionGraphWithAvailableResources();
+
+        assertThat(
+                executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism(),
+                is(numAvailableSlots));
+    }
+
+    @Test
+    public void testFatalErrorsForwardedToFatalErrorHandler() throws Exception {
+        final TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor)
+                        .setFatalErrorHandler(fatalErrorHandler)
+                        .build();
+
+        final RuntimeException exception = new RuntimeException();
+
+        scheduler.runIfState(
+                scheduler.getState(),
+                () -> {
+                    throw exception;
+                });
+
+        assertThat(fatalErrorHandler.getException(), is(exception));
+    }
+
+    // ---------------------------------------------------------------------------------------------
+    // State transition tests
+    // ---------------------------------------------------------------------------------------------
+
+    @Test
+    public void testStartSchedulingTransitionsToWaitingForResources() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        scheduler.startScheduling();
+
+        assertThat(scheduler.getState(), instanceOf(WaitingForResources.class));
+    }
+
+    @Test
+    public void testStartSchedulingSetsResourceRequirements() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        assertThat(
+                declarativeSlotPool.getResourceRequirements(),
+                contains(ResourceRequirement.create(ResourceProfile.UNKNOWN, PARALLELISM)));
+    }
+
+    /** Tests that the listener for new slots is properly set up. */
+    @Test
+    public void testResourceAcquisitionTriggersJobExecution() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        offerSlots(
+                declarativeSlotPool,
+                createSlotOffersForResourceRequirements(
+                        ResourceCounter.withResource(ResourceProfile.UNKNOWN, PARALLELISM)));
+
+        assertThat(scheduler.getState(), instanceOf(Executing.class));
+    }
+
+    @Test
+    public void testGoToFinished() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        final ArchivedExecutionGraph archivedExecutionGraph =
+                new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build();
+
+        scheduler.goToFinished(archivedExecutionGraph);
+
+        assertThat(scheduler.getState(), instanceOf(Finished.class));
+    }
+
+    @Test
+    public void testGoToFinishedNotifiesJobListener() throws Exception {
+        final AtomicReference<JobStatus> jobStatusUpdate = new AtomicReference<>();
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor)
+                        .setJobStatusListener(
+                                (jobId, newJobStatus, timestamp, error) ->
+                                        jobStatusUpdate.set(newJobStatus))
+                        .build();
+
+        final ArchivedExecutionGraph archivedExecutionGraph =
+                new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build();
+
+        scheduler.goToFinished(archivedExecutionGraph);
+
+        assertThat(jobStatusUpdate.get(), is(archivedExecutionGraph.getState()));
+    }
+
+    @Test
+    public void testGoToFinishedShutsDownCheckpointingComponents() throws Exception {
+        final CompletableFuture<JobStatus> completedCheckpointStoreShutdownFuture =
+                new CompletableFuture<>();
+        final CompletedCheckpointStore completedCheckpointStore =
+                new TestingCompletedCheckpointStore(completedCheckpointStoreShutdownFuture);
+
+        final CompletableFuture<JobStatus> checkpointIdCounterShutdownFuture =
+                new CompletableFuture<>();
+        final CheckpointIDCounter checkpointIdCounter =
+                new TestingCheckpointIDCounter(checkpointIdCounterShutdownFuture);
+
+        final JobGraph jobGraph = createJobGraph();
+        // checkpointing components are only created if checkpointing is enabled
+        jobGraph.setSnapshotSettings(
+                new JobCheckpointingSettings(
+                        CheckpointCoordinatorConfiguration.builder().build(), null));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setCheckpointRecoveryFactory(
+                                new TestingCheckpointRecoveryFactory(
+                                        completedCheckpointStore, checkpointIdCounter))
+                        .build();
+
+        final ArchivedExecutionGraph archivedExecutionGraph =
+                new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build();
+
+        scheduler.goToFinished(archivedExecutionGraph);
+
+        assertThat(completedCheckpointStoreShutdownFuture.get(), is(JobStatus.FAILED));
+        assertThat(checkpointIdCounterShutdownFuture.get(), is(JobStatus.FAILED));
+    }
+
+    @Test
+    public void testTransitionToStateCallsOnEnter() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        final LifecycleMethodCapturingState firstState = new LifecycleMethodCapturingState();
+
+        scheduler.transitionToState(firstState);
+        assertThat(firstState.onEnterCalled, is(true));
+        assertThat(firstState.onLeaveCalled, is(false));
+        firstState.reset();
+    }
+
+    @Test
+    public void testTransitionToStateCallsOnLeave() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        final LifecycleMethodCapturingState firstState = new LifecycleMethodCapturingState();
+        final DummyState secondState = new DummyState();
+
+        scheduler.transitionToState(firstState);
+        firstState.reset();
+
+        scheduler.transitionToState(secondState);
+        assertThat(firstState.onEnterCalled, is(false));
+        assertThat(firstState.onLeaveCalled, is(true));
+        assertThat(firstState.onLeaveNewStateArgument, sameInstance(secondState.getClass()));
+    }
+
+    @Test
+    public void testTransitionToStateIgnoresDuplicateTransitions() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        final LifecycleMethodCapturingState state = new LifecycleMethodCapturingState();
+        scheduler.transitionToState(state);
+        state.reset();
+
+        // attempt to transition into the state we are already in
+        scheduler.transitionToState(state);
+
+        assertThat(state.onEnterCalled, is(false));
+        assertThat(state.onLeaveCalled, is(false));
+    }
+
+    // ---------------------------------------------------------------------------------------------
+    // Failure handling tests
+    // ---------------------------------------------------------------------------------------------
+
+    @Test
+    public void testHowToHandleFailureRejectedByStrategy() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor)
+                        .setRestartBackoffTimeStrategy(NoRestartBackoffTimeStrategy.INSTANCE)
+                        .build();
+
+        assertThat(scheduler.howToHandleFailure(new Exception("test")).canRestart(), is(false));
+    }
+
+    @Test
+    public void testHowToHandleFailureAllowedByStrategy() throws Exception {
+        final TestRestartBackoffTimeStrategy restartBackoffTimeStrategy =
+                new TestRestartBackoffTimeStrategy(true, 1234);
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor)
+                        .setRestartBackoffTimeStrategy(restartBackoffTimeStrategy)
+                        .build();
+
+        final Executing.FailureResult failureResult =
+                scheduler.howToHandleFailure(new Exception("test"));
+
+        assertThat(failureResult.canRestart(), is(true));
+        assertThat(
+                failureResult.getBackoffTime().toMillis(),
+                is(restartBackoffTimeStrategy.getBackoffTime()));
+    }
+
+    @Test
+    public void testHowToHandleFailureUnrecoverableFailure() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        assertThat(
+                scheduler
+                        .howToHandleFailure(new SuppressRestartsException(new Exception("test")))
+                        .canRestart(),
+                is(false));
+    }
+
+    // ---------------------------------------------------------------------------------------------
+    // Illegal state behavior tests
+    // ---------------------------------------------------------------------------------------------
+
+    @Test
+    public void testTriggerSavepointFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.triggerSavepoint("some directory", false),
+                futureFailedWith(CheckpointException.class));
+    }
+
+    @Test
+    public void testStopWithSavepointFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.stopWithSavepoint("some directory", false),
+                futureFailedWith(CheckpointException.class));
+    }
+
+    @Test(expected = TaskNotRunningException.class)
+    public void testDeliverOperatorEventToCoordinatorFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        scheduler.deliverOperatorEventToCoordinator(
+                new ExecutionAttemptID(), new OperatorID(), new TestOperatorEvent());
+    }
+
+    @Test
+    public void testDeliverCoordinationRequestToCoordinatorFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.deliverCoordinationRequestToCoordinator(
+                        new OperatorID(), new CoordinationRequest() {}),
+                futureFailedWith(FlinkException.class));
+    }
+
+    @Test
+    public void testUpdateTaskExecutionStateReturnsFalseInIllegalState() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.updateTaskExecutionState(
+                        new TaskExecutionStateTransition(
+                                new TaskExecutionState(
+                                        jobGraph.getJobID(),
+                                        new ExecutionAttemptID(),
+                                        ExecutionState.FAILED))),
+                is(false));
+    }
+
+    @Test(expected = IOException.class)
+    public void testRequestNextInputSplitFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        scheduler.requestNextInputSplit(JOB_VERTEX.getID(), new ExecutionAttemptID());
+    }
+
+    @Test(expected = PartitionProducerDisposedException.class)
+    public void testRequestPartitionStateFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), mainThreadExecutor).build();
+
+        scheduler.requestPartitionState(new IntermediateDataSetID(), new ResultPartitionID());
+    }
+
+    // ---------------------------------------------------------------------------------------------
+    // Utils
+    // ---------------------------------------------------------------------------------------------
+
+    private static JobGraph createJobGraph() {
+        final JobGraph jobGraph = new JobGraph(JOB_VERTEX);
+        jobGraph.setJobType(JobType.STREAMING);
+        return jobGraph;
+    }
+
+    private static class LifecycleMethodCapturingState extends DummyState {
+        boolean onEnterCalled = false;
+        boolean onLeaveCalled = false;
+        @Nullable Class<? extends State> onLeaveNewStateArgument = null;
+
+        void reset() {
+            onEnterCalled = false;
+            onLeaveCalled = false;
+            onLeaveNewStateArgument = null;
+        }
+
+        @Override
+        public void onEnter() {
+            onEnterCalled = true;
+        }
+
+        @Override
+        public void onLeave(Class<? extends State> newState) {
+            onLeaveCalled = true;
+            onLeaveNewStateArgument = newState;
+        }
+    }
+
+    private static class DummyState implements State {
+
+        @Override
+        public void cancel() {}
+
+        @Override
+        public void suspend(Throwable cause) {}
+
+        @Override
+        public JobStatus getJobStatus() {
+            return null;
+        }
+
+        @Override
+        public ArchivedExecutionGraph getJob() {
+            return null;
+        }
+
+        @Override
+        public void handleGlobalFailure(Throwable cause) {}
+
+        @Override
+        public Logger getLogger() {
+            return null;
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
index bf83169..621afa5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
@@ -36,6 +36,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /** Tests for the {@link SharedSlot}. */
 public class SharedSlotTest extends TestLogger {
@@ -194,6 +195,16 @@ public class SharedSlotTest extends TestLogger {
                         }));
 
         sharedSlot.release(new Exception("test"));
+
+        // if all logical slots were released, and the sharedSlot no longer allows the allocation of
+        // logical slots, then the slot release was completed
+        assertThat(logicalSlot1.isAlive(), is(false));
+        assertThat(logicalSlot2.isAlive(), is(false));
+        try {
+            sharedSlot.allocateLogicalSlot();
+            fail("Allocation of logical slot should have failed because the slot was released.");
+        } catch (IllegalStateException expected) {
+        }
     }
 
     @Test(expected = IllegalStateException.class)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java
new file mode 100644
index 0000000..c053104
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Simple {@link AbstractInvokable} which blocks the first time it is run. Moreover, one can wait
+ * until n instances of this invokable are running by calling {@link #waitUntilOpsAreRunning()}.
+ *
+ * <p>Before using this class it is important to call {@link #resetFor}.
+ */
+public class OnceBlockingNoOpInvokable extends AbstractInvokable {
+
+    private static final AtomicInteger instanceCount = new AtomicInteger(0);
+
+    private static volatile CountDownLatch numOpsPending = new CountDownLatch(1);
+
+    private static volatile boolean isBlocking = true;
+
+    private final Object lock = new Object();
+
+    private volatile boolean running = true;
+
+    public OnceBlockingNoOpInvokable(Environment environment) {
+        super(environment);
+    }
+
+    @Override
+    public void invoke() throws Exception {
+
+        instanceCount.incrementAndGet();
+        numOpsPending.countDown();
+
+        synchronized (lock) {
+            while (isBlocking && running) {
+                lock.wait();
+            }
+        }
+
+        isBlocking = false;
+    }
+
+    @Override
+    public void cancel() throws Exception {
+        synchronized (lock) {
+            running = false;
+            lock.notifyAll();
+        }
+    }
+
+    public static void waitUntilOpsAreRunning() throws InterruptedException {
+        numOpsPending.await();
+    }
+
+    public static int getInstanceCount() {
+        return instanceCount.get();
+    }
+
+    public static void resetInstanceCount() {
+        instanceCount.set(0);
+    }
+
+    public static void resetFor(int parallelism) {
+        numOpsPending = new CountDownLatch(parallelism);
+        isBlocking = true;
+    }
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java
new file mode 100644
index 0000000..23dacd4
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import static org.junit.Assume.assumeTrue;
+
+/** Integration tests for the declarative scheduler. */
+public class DeclarativeSchedulerITCase extends TestLogger {
+
+    private static final int NUMBER_TASK_MANAGERS = 2;
+    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+    private static final int PARALLELISM = NUMBER_SLOTS_PER_TASK_MANAGER * NUMBER_TASK_MANAGERS;
+
+    private static final Configuration configuration = getConfiguration();
+
+    private static Configuration getConfiguration() {
+        final Configuration configuration = new Configuration();
+
+        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Declarative);
+        configuration.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
+
+        return configuration;
+    }
+
+    @ClassRule
+    public static final MiniClusterResource MINI_CLUSTER_WITH_CLIENT_RESOURCE =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(getConfiguration())
+                            .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+                            .setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+                            .build());
+
+    /** Tests that the declarative scheduler can recover stateful operators. */
+    @Test
+    public void testGlobalFailoverCanRecoverState() throws Exception {
+        assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        env.enableCheckpointing(20L, CheckpointingMode.EXACTLY_ONCE);
+        final DataStreamSource<Integer> input = env.addSource(new SimpleSource());
+
+        input.addSink(new DiscardingSink<>());
+
+        env.execute();
+    }
+
+    /**
+     * Simple source which fails once after a successful checkpoint has been taken. Upon recovery
+     * the source will immediately terminate.
+     */
+    public static final class SimpleSource extends RichParallelSourceFunction<Integer>
+            implements CheckpointListener, CheckpointedFunction {
+
+        private static final ListStateDescriptor<Boolean> unionStateListDescriptor =
+                new ListStateDescriptor<>("state", Boolean.class);
+
+        private volatile boolean running = true;
+
+        @Nullable private ListState<Boolean> unionListState = null;
+
+        private boolean hasFailedBefore = false;
+
+        private boolean fail = false;
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {
+            while (running && !hasFailedBefore) {
+                synchronized (ctx.getCheckpointLock()) {
+                    ctx.collect(getRuntimeContext().getIndexOfThisSubtask());
+
+                    Thread.sleep(5L);
+                }
+
+                if (fail) {
+                    throw new FlinkException("Test failure.");
+                }
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws Exception {
+            fail = true;
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {}
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) throws Exception {
+            unionListState =
+                    context.getOperatorStateStore().getUnionListState(unionStateListDescriptor);
+
+            for (Boolean previousState : unionListState.get()) {
+                hasFailedBefore |= previousState;
+            }
+
+            unionListState.clear();
+            unionListState.add(true);
+        }
+    }
+}


[flink] 10/12: [FLINK-21100][coordination] Pass FatalErrorHandler to scheduler factory

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3c463416d9b1303d7b54ec3fccf8f2c0d9f436ab
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Feb 9 21:22:37 2021 +0100

    [FLINK-21100][coordination] Pass FatalErrorHandler to scheduler factory
---
 .../src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java     | 1 +
 .../org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java     | 2 ++
 .../java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java     | 2 ++
 .../java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java | 2 ++
 .../org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java   | 2 ++
 5 files changed, 9 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6776462..844d144 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -356,6 +356,7 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId>
                         executionDeploymentTracker,
                         initializationTimestamp,
                         getMainThreadExecutor(),
+                        fatalErrorHandler,
                         jobStatusListener);
 
         return scheduler;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 27e7b9b..c888f65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
 import org.slf4j.Logger;
@@ -66,6 +67,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
             final ExecutionDeploymentTracker executionDeploymentTracker,
             long initializationTimestamp,
             final ComponentMainThreadExecutor mainThreadExecutor,
+            final FatalErrorHandler fatalErrorHandler,
             final JobStatusListener jobStatusListener)
             throws Exception {
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
index efb972a..1707779 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
 import org.slf4j.Logger;
@@ -58,6 +59,7 @@ public interface SchedulerNGFactory {
             ExecutionDeploymentTracker executionDeploymentTracker,
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
+            FatalErrorHandler fatalErrorHandler,
             JobStatusListener jobStatusListener)
             throws Exception;
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
index b83d80b..8485fe8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
 import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
@@ -103,6 +104,7 @@ public class JobMasterSchedulerTest extends TestLogger {
                 ExecutionDeploymentTracker executionDeploymentTracker,
                 long initializationTimestamp,
                 ComponentMainThreadExecutor mainThreadExecutor,
+                FatalErrorHandler fatalErrorHandler,
                 JobStatusListener jobStatusListener) {
             return TestingSchedulerNG.newBuilder()
                     .setStartSchedulingRunnable(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
index d066aec..cd3ab44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
 import org.slf4j.Logger;
@@ -66,6 +67,7 @@ public class TestingSchedulerNGFactory implements SchedulerNGFactory {
             ExecutionDeploymentTracker executionDeploymentTracker,
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
+            FatalErrorHandler fatalErrorHandler,
             JobStatusListener jobStatusListener)
             throws Exception {
         return schedulerNG;


[flink] 08/12: [FLINK-21100][coordination] Adjust SharedSlot lifecycle

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a56ca8610125548593226da0c85426ac40c7dbe6
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Feb 8 15:09:52 2021 +0100

    [FLINK-21100][coordination] Adjust SharedSlot lifecycle
    
    The external-release callback is now only run once the last logical slot has been returned.
    
    In practice the use-case for the callback is to bind a release call to the return of the last logical slot.
    The current behavior however muddies this contract since it may also be called from #release; leading to cases where #release is called twice on a single slot.
    
    This case got even worse when the slot was released before all logical slots have been returned; in this case all allocated logical slots are automatically being returned, triggering the callback, triggering another release call while the first one hasn't even finished yet.
---
 .../declarative/allocator/SharedSlot.java          | 39 +++++++++---
 .../declarative/allocator/SharedSlotTest.java      | 71 +++++++++++++++++++++-
 2 files changed, 98 insertions(+), 12 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
index 90cce74..5b41bad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
@@ -34,7 +34,20 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-/** Shared slot implementation for the declarative scheduler. */
+/**
+ * Shared slot implementation for the declarative scheduler.
+ *
+ * <p>The release process of a shared slot follows one of 2 code paths:
+ *
+ * <p>1)During normal execution all allocated logical slots will be returned, with the last return
+ * triggering the {@code externalReleaseCallback} which must eventually result in a {@link
+ * #release(Throwable)} call. 2)
+ *
+ * <p>2) If the backing physical is lost (e.g., because the providing TaskManager crashed) then
+ * {@link #release(Throwable)} is called without all logical slots having been returned. The runtime
+ * relies on this also triggering the release of all logical slots. This will not trigger the {@code
+ * externalReleaseCallback}.
+ */
 class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
     private static final Logger LOG = LoggerFactory.getLogger(SharedSlot.class);
 
@@ -94,7 +107,7 @@ class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
     public void returnLogicalSlot(LogicalSlot logicalSlot) {
         LOG.debug("Returning logical slot to shared slot ({})", physicalSlotRequestId);
         Preconditions.checkState(
-                state == State.ALLOCATED, "The shared slot has already been released.");
+                state != State.RELEASED, "The shared slot has already been released.");
 
         Preconditions.checkState(!logicalSlot.isAlive(), "Returned logic slot must not be alive.");
         Preconditions.checkState(
@@ -103,30 +116,35 @@ class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
         tryReleaseExternally();
     }
 
+    private void tryReleaseExternally() {
+        if (state == State.ALLOCATED && allocatedLogicalSlots.isEmpty()) {
+            LOG.debug("Release shared slot externally ({})", physicalSlotRequestId);
+            externalReleaseCallback.run();
+        }
+    }
+
     @Override
     public void release(Throwable cause) {
         LOG.debug("Release shared slot ({})", physicalSlotRequestId);
         Preconditions.checkState(
                 state == State.ALLOCATED, "The shared slot has already been released.");
 
+        // ensures that we won't call the external release callback if there are still logical slots
+        // to release
+        state = State.RELEASING;
+
         // copy the logical slot collection to avoid ConcurrentModificationException
         // if logical slot releases cause cancellation of other executions
         // which will try to call returnLogicalSlot and modify allocatedLogicalSlots collection
         final List<LogicalSlot> logicalSlotsToRelease =
                 new ArrayList<>(allocatedLogicalSlots.values());
         for (LogicalSlot allocatedLogicalSlot : logicalSlotsToRelease) {
+            // this will also cause the logical slot to be returned
             allocatedLogicalSlot.releaseSlot(cause);
         }
         allocatedLogicalSlots.clear();
-        tryReleaseExternally();
-    }
 
-    private void tryReleaseExternally() {
-        if (state != State.RELEASED && allocatedLogicalSlots.isEmpty()) {
-            state = State.RELEASED;
-            LOG.debug("Release shared slot externally ({})", physicalSlotRequestId);
-            externalReleaseCallback.run();
-        }
+        state = State.RELEASED;
     }
 
     @Override
@@ -136,6 +154,7 @@ class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
 
     private enum State {
         ALLOCATED,
+        RELEASING,
         RELEASED
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
index 8989a99..bf83169 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
@@ -27,7 +27,9 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
@@ -128,7 +130,7 @@ public class SharedSlotTest extends TestLogger {
     }
 
     @Test
-    public void testReleaseTriggersExternalRelease() {
+    public void testReleaseDoesNotTriggersExternalRelease() {
         final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
         final AtomicBoolean externalReleaseInitiated = new AtomicBoolean(false);
         final SharedSlot sharedSlot =
@@ -140,7 +142,7 @@ public class SharedSlotTest extends TestLogger {
 
         sharedSlot.release(new Exception("test"));
 
-        assertThat(externalReleaseInitiated.get(), is(true));
+        assertThat(externalReleaseInitiated.get(), is(false));
     }
 
     @Test
@@ -166,6 +168,48 @@ public class SharedSlotTest extends TestLogger {
         sharedSlot.allocateLogicalSlot();
     }
 
+    @Test
+    public void testCanReturnLogicalSlotDuringRelease() {
+        final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+        final SharedSlot sharedSlot =
+                new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+        final LogicalSlot logicalSlot1 = sharedSlot.allocateLogicalSlot();
+        final LogicalSlot logicalSlot2 = sharedSlot.allocateLogicalSlot();
+
+        // both slots try to release the other one, simulating that the failure of one execution due
+        // to the release also fails others
+        logicalSlot1.tryAssignPayload(
+                new TestLogicalSlotPayload(
+                        cause -> {
+                            if (logicalSlot2.isAlive()) {
+                                logicalSlot2.releaseSlot(cause);
+                            }
+                        }));
+        logicalSlot2.tryAssignPayload(
+                new TestLogicalSlotPayload(
+                        cause -> {
+                            if (logicalSlot1.isAlive()) {
+                                logicalSlot1.releaseSlot(cause);
+                            }
+                        }));
+
+        sharedSlot.release(new Exception("test"));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testCannotAllocateLogicalSlotDuringRelease() {
+        final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+        final SharedSlot sharedSlot =
+                new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+
+        final LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot();
+
+        logicalSlot.tryAssignPayload(
+                new TestLogicalSlotPayload(ignored -> sharedSlot.allocateLogicalSlot()));
+
+        sharedSlot.release(new Exception("test"));
+    }
+
     private static class TestPhysicalSlotPayload implements PhysicalSlot.Payload {
 
         @Override
@@ -176,4 +220,27 @@ public class SharedSlotTest extends TestLogger {
             return false;
         }
     }
+
+    private static class TestLogicalSlotPayload implements LogicalSlot.Payload {
+
+        private final Consumer<Throwable> failConsumer;
+
+        public TestLogicalSlotPayload() {
+            this.failConsumer = ignored -> {};
+        }
+
+        public TestLogicalSlotPayload(Consumer<Throwable> failConsumer) {
+            this.failConsumer = failConsumer;
+        }
+
+        @Override
+        public void fail(Throwable cause) {
+            failConsumer.accept(cause);
+        }
+
+        @Override
+        public CompletableFuture<?> getTerminalStateFuture() {
+            return new CompletableFuture<>();
+        }
+    }
 }


[flink] 06/12: [hotfix][coordination] Clarify visibility

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 81787d217ad106ac5655012be165f4d5fae50917
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Feb 10 16:24:48 2021 +0100

    [hotfix][coordination] Clarify visibility
    
    The handlers are only supposed to be accessible by sub-classes, while the execution graph is only visible for testing purposes.
---
 .../runtime/scheduler/declarative/StateWithExecutionGraph.java      | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StateWithExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StateWithExecutionGraph.java
index 1da441e..f34752e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StateWithExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StateWithExecutionGraph.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.scheduler.declarative;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.CheckpointingOptions;
@@ -106,15 +107,16 @@ abstract class StateWithExecutionGraph implements State {
                                 context.getMainThreadExecutor()));
     }
 
+    @VisibleForTesting
     ExecutionGraph getExecutionGraph() {
         return executionGraph;
     }
 
-    OperatorCoordinatorHandler getOperatorCoordinatorHandler() {
+    protected OperatorCoordinatorHandler getOperatorCoordinatorHandler() {
         return operatorCoordinatorHandler;
     }
 
-    ExecutionGraphHandler getExecutionGraphHandler() {
+    protected ExecutionGraphHandler getExecutionGraphHandler() {
         return executionGraphHandler;
     }
 


[flink] 04/12: [hotfix][coordination] Log that EG reached terminal state

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bdf213a2b0b4663a8f0f9069ce651acb87a6b443
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Feb 8 20:25:48 2021 +0100

    [hotfix][coordination] Log that EG reached terminal state
---
 .../java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java    | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 85a7cde..9e0f6d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1196,6 +1196,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
     }
 
     private void onTerminalState(JobStatus status) {
+        LOG.debug("ExecutionGraph {} reached terminal state {}.", getJobID(), status);
+
         try {
             CheckpointCoordinator coord = this.checkpointCoordinator;
             this.checkpointCoordinator = null;


[flink] 02/12: [hotfix][coordination] Remove leftover comment

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 88414e82ae7bf48bc171bf5f4a1734443a260c37
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Feb 8 20:23:14 2021 +0100

    [hotfix][coordination] Remove leftover comment
---
 .../org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java  | 2 --
 1 file changed, 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
index 6c2c5dc..798b50d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
@@ -48,8 +48,6 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
 
     private final SlotContext slotContext;
 
-    // null if the logical slot does not belong to a slot sharing group, otherwise non-null
-
     // locality of this slot wrt the requested preferred locations
     private final Locality locality;