You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/22 21:37:54 UTC

[flink] 01/03: [FLINK-24775][coordination] move JobStatus-related metrics out of ExecutionGraph

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f35a76c1e96a4cce51a7a5d335128769f0e196cc
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Nov 4 15:12:28 2021 +0100

    [FLINK-24775][coordination] move JobStatus-related metrics out of ExecutionGraph
---
 .../DefaultExecutionGraphBuilder.java              |   9 --
 .../flink/runtime/scheduler/JobStatusStore.java    |  50 ++++++++++
 .../flink/runtime/scheduler/SchedulerBase.java     |  20 +++-
 .../scheduler/adaptive/AdaptiveScheduler.java      |  29 +++---
 .../runtime/scheduler/JobStatusStoreTest.java      |  66 ++++++++++++
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 111 +++++++++++++++++++--
 6 files changed, 251 insertions(+), 34 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
index d1c6d6f..ff1af8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
@@ -37,9 +37,6 @@ import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
 import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategyFactoryLoader;
-import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
-import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
-import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
 import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -315,12 +312,6 @@ public class DefaultExecutionGraphBuilder {
                     checkpointsCleaner);
         }
 
-        // create all the metrics for the Execution Graph
-
-        metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
-        metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
-        metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));
-
         return executionGraph;
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/JobStatusStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/JobStatusStore.java
new file mode 100644
index 0000000..36fccb8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/JobStatusStore.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.JobStatusProvider;
+
+/** Listens for and exposes the current job state and state timestamps. */
+public class JobStatusStore implements JobStatusListener, JobStatusProvider {
+
+    private final long[] stateTimestamps = new long[JobStatus.values().length];
+    private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+    public JobStatusStore(long initializationTimestamp) {
+        stateTimestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
+    }
+
+    @Override
+    public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) {
+        jobStatus = newJobStatus;
+        stateTimestamps[jobStatus.ordinal()] = timestamp;
+    }
+
+    @Override
+    public JobStatus getState() {
+        return jobStatus;
+    }
+
+    @Override
+    public long getStatusTimestamp(JobStatus status) {
+        return stateTimestamps[status.ordinal()];
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index ac9aca7..180921e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
@@ -49,8 +51,12 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.JobStatusProvider;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
+import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
+import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
+import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -579,14 +585,20 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling
     @Override
     public final void startScheduling() {
         mainThreadExecutor.assertRunningInMainThread();
-        registerJobMetrics();
+        registerJobMetrics(jobManagerJobMetricGroup, executionGraph, this::getNumberOfRestarts);
         operatorCoordinatorHandler.startAllOperatorCoordinators();
         startSchedulingInternal();
     }
 
-    private void registerJobMetrics() {
-        jobManagerJobMetricGroup.gauge(MetricNames.NUM_RESTARTS, this::getNumberOfRestarts);
-        jobManagerJobMetricGroup.gauge(MetricNames.FULL_RESTARTS, this::getNumberOfRestarts);
+    public static void registerJobMetrics(
+            MetricGroup metrics,
+            JobStatusProvider jobStatusProvider,
+            Gauge<Long> numberOfRestarts) {
+        metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(jobStatusProvider));
+        metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(jobStatusProvider));
+        metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(jobStatusProvider));
+        metrics.gauge(MetricNames.NUM_RESTARTS, numberOfRestarts);
+        metrics.gauge(MetricNames.FULL_RESTARTS, numberOfRestarts);
     }
 
     protected abstract void startSchedulingInternal();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 796b9c4..4333881 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
-import org.apache.flink.metrics.Gauge;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
@@ -71,7 +70,6 @@ import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
@@ -86,6 +84,7 @@ import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore;
 import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
 import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.scheduler.JobStatusStore;
 import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
@@ -116,6 +115,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Optional;
@@ -178,7 +178,7 @@ public class AdaptiveScheduler
     private final ComponentMainThreadExecutor componentMainThreadExecutor;
     private final FatalErrorHandler fatalErrorHandler;
 
-    private final JobStatusListener jobStatusListener;
+    private final Collection<JobStatusListener> jobStatusListeners;
 
     private final SlotAllocator slotAllocator;
 
@@ -255,7 +255,10 @@ public class AdaptiveScheduler
         declarativeSlotPool.registerNewSlotsListener(this::newResourcesAvailable);
 
         this.componentMainThreadExecutor = mainThreadExecutor;
-        this.jobStatusListener = Preconditions.checkNotNull(jobStatusListener);
+
+        final JobStatusStore jobStatusStore = new JobStatusStore(initializationTimestamp);
+        this.jobStatusListeners =
+                Arrays.asList(Preconditions.checkNotNull(jobStatusListener), jobStatusStore);
 
         this.scaleUpController = new ReactiveScaleUpController(configuration);
 
@@ -265,7 +268,8 @@ public class AdaptiveScheduler
 
         this.executionGraphFactory = executionGraphFactory;
 
-        registerMetrics();
+        SchedulerBase.registerJobMetrics(
+                jobManagerJobMetricGroup, jobStatusStore, () -> (long) numRestarts);
     }
 
     private static void assertPreconditions(JobGraph jobGraph) throws RuntimeException {
@@ -392,12 +396,6 @@ public class AdaptiveScheduler
                 "newResourcesAvailable");
     }
 
-    private void registerMetrics() {
-        final Gauge<Integer> numRestartsMetric = () -> numRestarts;
-        jobManagerJobMetricGroup.gauge(MetricNames.NUM_RESTARTS, numRestartsMetric);
-        jobManagerJobMetricGroup.gauge(MetricNames.FULL_RESTARTS, numRestartsMetric);
-    }
-
     @Override
     public void startScheduling() {
         state.as(Created.class)
@@ -1157,10 +1155,11 @@ public class AdaptiveScheduler
             final JobStatus newJobStatus = state.getJobStatus();
 
             if (previousJobStatus != newJobStatus) {
-                jobStatusListener.jobStatusChanges(
-                        jobInformation.getJobID(),
-                        state.getJobStatus(),
-                        System.currentTimeMillis());
+                final long timestamp = System.currentTimeMillis();
+                jobStatusListeners.forEach(
+                        listener ->
+                                listener.jobStatusChanges(
+                                        jobInformation.getJobID(), newJobStatus, timestamp));
             }
 
             return targetStateInstance;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/JobStatusStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/JobStatusStoreTest.java
new file mode 100644
index 0000000..4713b11
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/JobStatusStoreTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+class JobStatusStoreTest {
+
+    @Test
+    void initialState() {
+        final JobStatusStore store = new JobStatusStore(0);
+        assertThat(store.getState(), is(JobStatus.INITIALIZING));
+    }
+
+    @Test
+    void initialTimestamps() {
+        final JobStatusStore store = new JobStatusStore(967823L);
+
+        for (JobStatus jobStatus : JobStatus.values()) {
+            switch (jobStatus) {
+                case INITIALIZING:
+                    assertThat(store.getStatusTimestamp(JobStatus.INITIALIZING), is(967823L));
+                    break;
+                default:
+                    assertThat(store.getStatusTimestamp(jobStatus), is(0L));
+            }
+        }
+    }
+
+    @Test
+    void getState() {
+        final JobStatusStore store = new JobStatusStore(0L);
+        store.jobStatusChanges(new JobID(), JobStatus.RUNNING, 1L);
+
+        assertThat(store.getState(), is(JobStatus.RUNNING));
+    }
+
+    @Test
+    void getStatusTimestamp() {
+        final JobStatusStore store = new JobStatusStore(0L);
+        store.jobStatusChanges(new JobID(), JobStatus.RUNNING, 1L);
+
+        assertThat(store.getStatusTimestamp(JobStatus.RUNNING), is(1L));
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 92658ec..458098d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -44,6 +44,9 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
+import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
+import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -108,6 +111,7 @@ import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGr
 import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
 import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.core.Is.is;
@@ -412,13 +416,13 @@ public class AdaptiveSchedulerTest extends TestLogger {
 
     @Test
     public void testNumRestartsMetric() throws Exception {
-        final CompletableFuture<Gauge<Integer>> numRestartsMetricFuture = new CompletableFuture<>();
+        final CompletableFuture<Gauge<Long>> numRestartsMetricFuture = new CompletableFuture<>();
         final MetricRegistry metricRegistry =
                 TestingMetricRegistry.builder()
                         .setRegisterConsumer(
                                 (metric, name, group) -> {
                                     if (MetricNames.NUM_RESTARTS.equals(name)) {
-                                        numRestartsMetricFuture.complete((Gauge<Integer>) metric);
+                                        numRestartsMetricFuture.complete((Gauge<Long>) metric);
                                     }
                                 })
                         .build();
@@ -447,7 +451,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
                         .setDeclarativeSlotPool(declarativeSlotPool)
                         .build();
 
-        final Gauge<Integer> numRestartsMetric = numRestartsMetricFuture.get();
+        final Gauge<Long> numRestartsMetric = numRestartsMetricFuture.get();
 
         final SubmissionBufferingTaskManagerGateway taskManagerGateway =
                 new SubmissionBufferingTaskManagerGateway(1 + PARALLELISM);
@@ -469,7 +473,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
         // wait for the first task submission
         taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5));
 
-        assertThat(numRestartsMetric.getValue(), is(0));
+        assertThat(numRestartsMetric.getValue(), is(0L));
 
         singleThreadMainThreadExecutor.execute(
                 () -> {
@@ -485,7 +489,102 @@ public class AdaptiveSchedulerTest extends TestLogger {
         // wait for the second task submissions
         taskManagerGateway.waitForSubmissions(PARALLELISM, Duration.ofSeconds(5));
 
-        assertThat(numRestartsMetric.getValue(), is(1));
+        assertThat(numRestartsMetric.getValue(), is(1L));
+    }
+
+    @Test
+    public void testStatusMetrics() throws Exception {
+        final CompletableFuture<UpTimeGauge> upTimeMetricFuture = new CompletableFuture<>();
+        final CompletableFuture<DownTimeGauge> downTimeMetricFuture = new CompletableFuture<>();
+        final CompletableFuture<RestartTimeGauge> restartTimeMetricFuture =
+                new CompletableFuture<>();
+        final MetricRegistry metricRegistry =
+                TestingMetricRegistry.builder()
+                        .setRegisterConsumer(
+                                (metric, name, group) -> {
+                                    switch (name) {
+                                        case UpTimeGauge.METRIC_NAME:
+                                            upTimeMetricFuture.complete((UpTimeGauge) metric);
+                                            break;
+                                        case DownTimeGauge.METRIC_NAME:
+                                            downTimeMetricFuture.complete((DownTimeGauge) metric);
+                                            break;
+                                        case RestartTimeGauge.METRIC_NAME:
+                                            restartTimeMetricFuture.complete(
+                                                    (RestartTimeGauge) metric);
+                                            break;
+                                    }
+                                })
+                        .build();
+
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1);
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(10L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setJobManagerJobMetricGroup(
+                                JobManagerMetricGroup.createJobManagerMetricGroup(
+                                                metricRegistry, "localhost")
+                                        .addJob(new JobID(), "jobName"))
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final UpTimeGauge upTimeGauge = upTimeMetricFuture.get();
+        final DownTimeGauge downTimeGauge = downTimeMetricFuture.get();
+        final RestartTimeGauge restartTimeGauge = restartTimeMetricFuture.get();
+
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(1 + PARALLELISM);
+
+        taskManagerGateway.setCancelConsumer(createCancelConsumer(scheduler));
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
+                            taskManagerGateway);
+                });
+
+        // wait for the first task submission
+        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5));
+
+        // sleep a bit to ensure uptime is > 0
+        Thread.sleep(10L);
+
+        assertThat(upTimeGauge.getValue(), greaterThan(0L));
+        assertThat(downTimeGauge.getValue(), is(0L));
+        assertThat(restartTimeGauge.getValue(), is(0L));
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    // offer more slots, which will cause a restart in order to scale up
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
+                            taskManagerGateway);
+                });
+
+        // wait for the second task submissions
+        taskManagerGateway.waitForSubmissions(2, Duration.ofSeconds(5));
+
+        // sleep a bit to ensure uptime is > 0
+        Thread.sleep(10L);
+
+        assertThat(upTimeGauge.getValue(), greaterThan(0L));
+        assertThat(downTimeGauge.getValue(), is(0L));
+        assertThat(restartTimeGauge.getValue(), greaterThan(0L));
     }
 
     // ---------------------------------------------------------------------------------------------
@@ -1127,7 +1226,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
 
         @Override
         public JobStatus getJobStatus() {
-            return null;
+            return JobStatus.RUNNING;
         }
 
         @Override