You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/01/16 12:30:38 UTC

[flink] branch master updated (f5429e8 -> 8332e9b)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from f5429e8  [FLINK-24444][runtime][tests] Wait until checkpoints stopped triggering
     new 397ca61  [hotfix][tests] Refactor scheduler factory methods
     new 8332e9b  [FLINK-23976][metrics] Add additional job status metrics

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content.zh/docs/ops/metrics.md                |  33 ++-
 docs/content/docs/ops/metrics.md                   |  43 +++-
 .../shortcodes/generated/metric_configuration.html |   6 +
 .../apache/flink/configuration/MetricOptions.java  |  87 +++++++
 .../executiongraph/metrics/RestartTimeGauge.java   |  80 -------
 .../flink/runtime/scheduler/SchedulerBase.java     | 114 ++++++++-
 .../scheduler/adaptive/AdaptiveScheduler.java      |  22 +-
 .../metrics/RestartTimeGaugeTest.java              |  84 -------
 .../jobmaster/slotpool/SlotPoolTestUtils.java      |   8 +
 .../runtime/scheduler/DefaultSchedulerTest.java    | 141 ++++++++++--
 .../flink/runtime/scheduler/SchedulerBaseTest.java | 254 +++++++++++++++++++++
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |  19 +-
 12 files changed, 677 insertions(+), 214 deletions(-)
 delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGaugeTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerBaseTest.java

[flink] 02/02: [FLINK-23976][metrics] Add additional job status metrics

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8332e9bbe0a3b490a85585367a82e87d9044aae9
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Dec 17 12:22:38 2021 +0100

    [FLINK-23976][metrics] Add additional job status metrics
---
 docs/content.zh/docs/ops/metrics.md                |  33 ++-
 docs/content/docs/ops/metrics.md                   |  43 +++-
 .../shortcodes/generated/metric_configuration.html |   6 +
 .../apache/flink/configuration/MetricOptions.java  |  87 +++++++
 .../executiongraph/metrics/RestartTimeGauge.java   |  80 -------
 .../flink/runtime/scheduler/SchedulerBase.java     | 114 ++++++++-
 .../scheduler/adaptive/AdaptiveScheduler.java      |  22 +-
 .../metrics/RestartTimeGaugeTest.java              |  84 -------
 .../jobmaster/slotpool/SlotPoolTestUtils.java      |   8 +
 .../runtime/scheduler/DefaultSchedulerTest.java    | 100 ++++++++
 .../flink/runtime/scheduler/SchedulerBaseTest.java | 254 +++++++++++++++++++++
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |  19 +-
 12 files changed, 655 insertions(+), 195 deletions(-)

diff --git a/docs/content.zh/docs/ops/metrics.md b/docs/content.zh/docs/ops/metrics.md
index dfd961a..2b8a5fd 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -1066,6 +1066,9 @@ Metrics related to data exchange between task executors using netty network comm
 
 ### Availability
 
+The metrics in this table are available for each of the following job states: INITIALIZING, CREATED, RUNNING, RESTARTING, CANCELLING, FAILING.
+Whether these metrics are reported depends on the [metrics.job.status.enable]({{< ref "docs/deployment/config" >}}#metrics-job-status-enable) setting.
+
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -1077,12 +1080,36 @@ Metrics related to data exchange between task executors using netty network comm
   </thead>
   <tbody>
     <tr>
-      <th rowspan="5"><strong>Job (only available on JobManager)</strong></th>
-      <td>restartingTime</td>
-      <td>The time it took to restart the job, or how long the current restart has been in progress (in milliseconds).</td>
+      <th rowspan="3"><strong>Job (only available on JobManager)</strong></th>
+      <td>&lt;jobStatus&gt;State</td>
+      <td>For a given state, return 1 if the job is currently in that state, otherwise return 0.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>&lt;jobStatus&gt;Time</td>
+      <td>For a given state, if the job is currently in that state, return the time (in milliseconds) since the job transitioned into that state, otherwise return 0.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>&lt;jobStatus&gt;TimeTotal</td>
+      <td>For a given state, return how much time (in milliseconds) the job has spent in that state in total.</td>
       <td>Gauge</td>
     </tr>
+  </tbody>
+</table>
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 18%">Scope</th>
+      <th class="text-left" style="width: 26%">Metrics</th>
+      <th class="text-left" style="width: 48%">Description</th>
+      <th class="text-left" style="width: 8%">Type</th>
+    </tr>
+  </thead>
+  <tbody>
     <tr>
+      <th rowspan="4"><strong>Job (only available on JobManager)</strong></th>
       <td>uptime</td>
       <td>
         The time that the job has been running without interruption.
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index e6a09ea..88a9bd7 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -1066,6 +1066,9 @@ Metrics related to data exchange between task executors using netty network comm
 
 ### Availability
 
+The metrics in this table are available for each of the following job states: INITIALIZING, CREATED, RUNNING, RESTARTING, CANCELLING, FAILING.
+Whether these metrics are reported depends on the [metrics.job.status.enable]({{< ref "docs/deployment/config" >}}#metrics-job-status-enable) setting.
+
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -1077,25 +1080,43 @@ Metrics related to data exchange between task executors using netty network comm
   </thead>
   <tbody>
     <tr>
-      <th rowspan="5"><strong>Job (only available on JobManager)</strong></th>
-      <td>restartingTime</td>
-      <td>The time it took to restart the job, or how long the current restart has been in progress (in milliseconds).</td>
+      <th rowspan="3"><strong>Job (only available on JobManager)</strong></th>
+      <td>&lt;jobStatus&gt;State</td>
+      <td>For a given state, return 1 if the job is currently in that state, otherwise return 0.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>&lt;jobStatus&gt;Time</td>
+      <td>For a given state, if the job is currently in that state, return the time (in milliseconds) since the job transitioned into that state, otherwise return 0.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>&lt;jobStatus&gt;TimeTotal</td>
+      <td>For a given state, return how much time (in milliseconds) the job has spent in that state in total.</td>
       <td>Gauge</td>
     </tr>
+  </tbody>
+</table>
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 18%">Scope</th>
+      <th class="text-left" style="width: 26%">Metrics</th>
+      <th class="text-left" style="width: 48%">Description</th>
+      <th class="text-left" style="width: 8%">Type</th>
+    </tr>
+  </thead>
+  <tbody>
     <tr>
+      <th rowspan="4"><strong>Job (only available on JobManager)</strong></th>
       <td>uptime</td>
-      <td>
-        The time that the job has been running without interruption.
-        <p>Returns -1 for completed jobs (in milliseconds).</p>
-      </td>
+      <td><span class="label label-danger">Attention:</span> deprecated, use <b>runningTime</b>.</td>
       <td>Gauge</td>
     </tr>
     <tr>
       <td>downtime</td>
-      <td>
-        For jobs currently in a failing/recovering situation, the time elapsed during this outage.
-        <p>Returns 0 for running jobs and -1 for completed jobs (in milliseconds).</p>
-      </td>
+      <td><span class="label label-danger">Attention:</span> deprecated, use <b>restartingTime</b>, <b>cancellingTime</b> <b>failingTime</b>.</td>
       <td>Gauge</td>
     </tr>
     <tr>
diff --git a/docs/layouts/shortcodes/generated/metric_configuration.html b/docs/layouts/shortcodes/generated/metric_configuration.html
index b906b38..c0fbc86 100644
--- a/docs/layouts/shortcodes/generated/metric_configuration.html
+++ b/docs/layouts/shortcodes/generated/metric_configuration.html
@@ -27,6 +27,12 @@
             <td>The thread priority used for Flink's internal metric query service. The thread is created by Akka's thread pool executor. The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). Warning, increasing this value may bring the main Flink components down.</td>
         </tr>
         <tr>
+            <td><h5>metrics.job.status.enable</h5></td>
+            <td style="word-wrap: break-word;">CURRENT_TIME</td>
+            <td><p>List&lt;Enum&gt;</p></td>
+            <td>The selection of job status metrics that should be reported.<br /><br />Possible values:<ul><li>"STATE": For a given state, return 1 if the job is currently in that state, otherwise return 0.</li><li>"CURRENT_TIME": For a given state, if the job is currently in that state, return the time since the job transitioned into that state, otherwise return 0.</li><li>"TOTAL_TIME": For a given state, return how much time the job has spent in that state in total.</li></ul></td>
+        </tr>
+        <tr>
             <td><h5>metrics.latency.granularity</h5></td>
             <td style="word-wrap: break-word;">"operator"</td>
             <td>String</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 0ba3cf5..86f787d6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -21,8 +21,11 @@ package org.apache.flink.configuration;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.InlineElement;
+import org.apache.flink.configuration.description.TextElement;
 
 import java.time.Duration;
+import java.util.List;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.apache.flink.configuration.description.TextElement.text;
@@ -215,5 +218,89 @@ public class MetricOptions {
                                     + "faster updating metrics. Increase this value if the metric fetcher causes too much load. Setting this value to 0 "
                                     + "disables the metric fetching completely.");
 
+    /** Controls which job status metrics will be exposed. */
+    public static final ConfigOption<List<JobStatusMetrics>> JOB_STATUS_METRICS =
+            key("metrics.job.status.enable")
+                    .enumType(JobStatusMetrics.class)
+                    .asList()
+                    .defaultValues(JobStatusMetrics.CURRENT_TIME)
+                    .withDescription(
+                            "The selection of job status metrics that should be reported.");
+
+    /** Enum describing the different kinds of job status metrics. */
+    public enum JobStatusMetrics implements DescribedEnum {
+        STATE(
+                "For a given state, return 1 if the job is currently in that state, otherwise return 0."),
+        CURRENT_TIME(
+                "For a given state, if the job is currently in that state, return the time since the job transitioned into that state, otherwise return 0."),
+        TOTAL_TIME(
+                "For a given state, return how much time the job has spent in that state in total."),
+        ;
+
+        private final String description;
+
+        JobStatusMetrics(String description) {
+            this.description = description;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return TextElement.text(description);
+        }
+    }
+
+    /** Describes which job status metrics have been enabled. */
+    public static final class JobStatusMetricsSettings {
+
+        private final boolean stateMetricsEnabled;
+        private final boolean currentTimeMetricsEnabled;
+        private final boolean totalTimeMetricsEnabled;
+
+        private JobStatusMetricsSettings(
+                boolean stateMetricsEnabled,
+                boolean currentTimeMetricsEnabled,
+                boolean totalTimeMetricsEnabled) {
+            this.stateMetricsEnabled = stateMetricsEnabled;
+            this.currentTimeMetricsEnabled = currentTimeMetricsEnabled;
+            this.totalTimeMetricsEnabled = totalTimeMetricsEnabled;
+        }
+
+        public boolean isStateMetricsEnabled() {
+            return stateMetricsEnabled;
+        }
+
+        public boolean isCurrentTimeMetricsEnabled() {
+            return currentTimeMetricsEnabled;
+        }
+
+        public boolean isTotalTimeMetricsEnabled() {
+            return totalTimeMetricsEnabled;
+        }
+
+        public static JobStatusMetricsSettings fromConfiguration(Configuration configuration) {
+            final List<JobStatusMetrics> jobStatusMetrics = configuration.get(JOB_STATUS_METRICS);
+            boolean stateMetricsEnabled = false;
+            boolean currentTimeMetricsEnabled = false;
+            boolean totalTimeMetricsEnabled = false;
+
+            for (JobStatusMetrics jobStatusMetric : jobStatusMetrics) {
+                switch (jobStatusMetric) {
+                    case STATE:
+                        stateMetricsEnabled = true;
+                        break;
+                    case CURRENT_TIME:
+                        currentTimeMetricsEnabled = true;
+                        break;
+                    case TOTAL_TIME:
+                        totalTimeMetricsEnabled = true;
+                        break;
+                }
+            }
+
+            return new JobStatusMetricsSettings(
+                    stateMetricsEnabled, currentTimeMetricsEnabled, totalTimeMetricsEnabled);
+        }
+    }
+
     private MetricOptions() {}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java
deleted file mode 100644
index 6840f7e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph.metrics;
-
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.runtime.executiongraph.JobStatusProvider;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Gauge which returns the last restarting time.
- *
- * <p>Restarting time is the time between {@link JobStatus#RESTARTING} and {@link
- * JobStatus#RUNNING}, or a terminal state if {@link JobStatus#RUNNING} was not reached.
- *
- * <p>If the job has not yet reached either of these states, then the time is measured since
- * reaching {@link JobStatus#RESTARTING}. If it is still the initial job execution, then the gauge
- * will return 0.
- */
-public class RestartTimeGauge implements Gauge<Long> {
-
-    public static final String METRIC_NAME = "restartingTime";
-
-    // ------------------------------------------------------------------------
-
-    private final JobStatusProvider jobStatusProvider;
-
-    public RestartTimeGauge(JobStatusProvider jobStatusProvider) {
-        this.jobStatusProvider = checkNotNull(jobStatusProvider);
-    }
-
-    // ------------------------------------------------------------------------
-
-    @Override
-    public Long getValue() {
-        final JobStatus status = jobStatusProvider.getState();
-
-        final long restartingTimestamp = jobStatusProvider.getStatusTimestamp(JobStatus.RESTARTING);
-
-        final long switchToRunningTimestamp;
-        final long lastRestartTime;
-
-        if (restartingTimestamp <= 0) {
-            // we haven't yet restarted our job
-            return 0L;
-        } else if ((switchToRunningTimestamp =
-                        jobStatusProvider.getStatusTimestamp(JobStatus.RUNNING))
-                >= restartingTimestamp) {
-            // we have transitioned to RUNNING since the last restart
-            lastRestartTime = switchToRunningTimestamp - restartingTimestamp;
-        } else if (status.isTerminalState()) {
-            // since the last restart we've switched to a terminal state without touching
-            // the RUNNING state (e.g. failing from RESTARTING)
-            lastRestartTime = jobStatusProvider.getStatusTimestamp(status) - restartingTimestamp;
-        } else {
-            // we're still somewhere between RESTARTING and RUNNING
-            lastRestartTime = System.currentTimeMillis() - restartingTimestamp;
-        }
-
-        // we guard this with 'Math.max' to avoid negative timestamps when clocks re-sync
-        return Math.max(lastRestartTime, 0);
-    }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index db92b7c..10c063c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
@@ -55,7 +56,6 @@ import org.apache.flink.runtime.executiongraph.JobStatusProvider;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
 import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
-import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
 import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -92,6 +92,8 @@ import org.apache.flink.runtime.util.IntArrayList;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.IterableUtils;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.slf4j.Logger;
@@ -105,6 +107,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
@@ -112,6 +115,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -156,6 +160,8 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling
 
     private final ExecutionGraphFactory executionGraphFactory;
 
+    private final MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings;
+
     public SchedulerBase(
             final Logger log,
             final JobGraph jobGraph,
@@ -223,6 +229,9 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling
         this.exceptionHistory =
                 new BoundedFIFOQueue<>(
                         jobMasterConfiguration.getInteger(WebOptions.MAX_EXCEPTION_HISTORY_SIZE));
+
+        this.jobStatusMetricsSettings =
+                MetricOptions.JobStatusMetricsSettings.fromConfiguration(jobMasterConfiguration);
     }
 
     private void shutDownCheckpointServices(JobStatus jobStatus) {
@@ -599,7 +608,13 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling
     @Override
     public final void startScheduling() {
         mainThreadExecutor.assertRunningInMainThread();
-        registerJobMetrics(jobManagerJobMetricGroup, executionGraph, this::getNumberOfRestarts);
+        registerJobMetrics(
+                jobManagerJobMetricGroup,
+                executionGraph,
+                this::getNumberOfRestarts,
+                executionGraph::registerJobStatusListener,
+                executionGraph.getStatusTimestamp(JobStatus.INITIALIZING),
+                jobStatusMetricsSettings);
         operatorCoordinatorHandler.startAllOperatorCoordinators();
         startSchedulingInternal();
     }
@@ -607,12 +622,103 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling
     public static void registerJobMetrics(
             MetricGroup metrics,
             JobStatusProvider jobStatusProvider,
-            Gauge<Long> numberOfRestarts) {
-        metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(jobStatusProvider));
+            Gauge<Long> numberOfRestarts,
+            Consumer<JobStatusListener> jobStatusListenerRegistrar,
+            long initializationTimestamp,
+            MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings) {
         metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(jobStatusProvider));
         metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(jobStatusProvider));
         metrics.gauge(MetricNames.NUM_RESTARTS, numberOfRestarts);
         metrics.gauge(MetricNames.FULL_RESTARTS, numberOfRestarts);
+
+        jobStatusListenerRegistrar.accept(
+                new JobStatusMetrics(metrics, initializationTimestamp, jobStatusMetricsSettings));
+    }
+
+    @VisibleForTesting
+    static class JobStatusMetrics implements JobStatusListener {
+
+        private JobStatus currentStatus = JobStatus.INITIALIZING;
+        private long currentStatusTimestamp;
+        private final long[] cumulativeStatusTimes;
+
+        public JobStatusMetrics(
+                MetricGroup metricGroup,
+                long initializationTimestamp,
+                MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings) {
+
+            currentStatus = JobStatus.INITIALIZING;
+            currentStatusTimestamp = initializationTimestamp;
+            cumulativeStatusTimes = new long[JobStatus.values().length];
+
+            for (JobStatus jobStatus : JobStatus.values()) {
+                if (!jobStatus.isTerminalState() && jobStatus != JobStatus.RECONCILING) {
+
+                    if (jobStatusMetricsSettings.isStateMetricsEnabled()) {
+                        metricGroup.gauge(
+                                getStateMetricName(jobStatus), createStateMetric(jobStatus));
+                    }
+
+                    if (jobStatusMetricsSettings.isCurrentTimeMetricsEnabled()) {
+                        metricGroup.gauge(
+                                getCurrentTimeMetricName(jobStatus),
+                                createCurrentTimeMetric(jobStatus, SystemClock.getInstance()));
+                    }
+
+                    if (jobStatusMetricsSettings.isTotalTimeMetricsEnabled()) {
+                        metricGroup.gauge(
+                                getTotalTimeMetricName(jobStatus),
+                                createTotalTimeMetric(jobStatus, SystemClock.getInstance()));
+                    }
+                }
+            }
+        }
+
+        @VisibleForTesting
+        Gauge<Long> createStateMetric(JobStatus jobStatus) {
+            return () -> currentStatus == jobStatus ? 1L : 0L;
+        }
+
+        @VisibleForTesting
+        Gauge<Long> createCurrentTimeMetric(JobStatus jobStatus, Clock clock) {
+            return () ->
+                    currentStatus == jobStatus
+                            ? Math.max(clock.absoluteTimeMillis() - currentStatusTimestamp, 0)
+                            : 0;
+        }
+
+        @VisibleForTesting
+        Gauge<Long> createTotalTimeMetric(JobStatus jobStatus, Clock clock) {
+            return () ->
+                    currentStatus == jobStatus
+                            ? cumulativeStatusTimes[jobStatus.ordinal()]
+                                    + Math.max(
+                                            clock.absoluteTimeMillis() - currentStatusTimestamp, 0)
+                            : cumulativeStatusTimes[jobStatus.ordinal()];
+        }
+
+        @VisibleForTesting
+        static String getStateMetricName(JobStatus jobStatus) {
+            return jobStatus.name().toLowerCase(Locale.ROOT) + "State";
+        }
+
+        @VisibleForTesting
+        static String getCurrentTimeMetricName(JobStatus jobStatus) {
+            return jobStatus.name().toLowerCase(Locale.ROOT) + "Time";
+        }
+
+        @VisibleForTesting
+        static String getTotalTimeMetricName(JobStatus jobStatus) {
+            return jobStatus.name().toLowerCase(Locale.ROOT) + "TimeTotal";
+        }
+
+        @Override
+        public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) {
+            cumulativeStatusTimes[currentStatus.ordinal()] += timestamp - currentStatusTimestamp;
+
+            currentStatus = newJobStatus;
+            currentStatusTimestamp = timestamp;
+        }
     }
 
     protected abstract void startSchedulingInternal();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 06588bc..db6c2a17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.JobException;
@@ -115,8 +116,9 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.time.Duration;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -189,6 +191,7 @@ public class AdaptiveScheduler
     private final Duration resourceStabilizationTimeout;
 
     private final ExecutionGraphFactory executionGraphFactory;
+    private final JobStatusStore jobStatusStore;
 
     private State state = new Created(this, LOG);
 
@@ -258,9 +261,7 @@ public class AdaptiveScheduler
 
         this.componentMainThreadExecutor = mainThreadExecutor;
 
-        final JobStatusStore jobStatusStore = new JobStatusStore(initializationTimestamp);
-        this.jobStatusListeners =
-                Arrays.asList(Preconditions.checkNotNull(jobStatusListener), jobStatusStore);
+        this.jobStatusStore = new JobStatusStore(initializationTimestamp);
 
         this.scaleUpController = new ReactiveScaleUpController(configuration);
 
@@ -270,8 +271,19 @@ public class AdaptiveScheduler
 
         this.executionGraphFactory = executionGraphFactory;
 
+        final Collection<JobStatusListener> tmpJobStatusListeners = new ArrayList<>();
+        tmpJobStatusListeners.add(Preconditions.checkNotNull(jobStatusListener));
+        tmpJobStatusListeners.add(jobStatusStore);
+
         SchedulerBase.registerJobMetrics(
-                jobManagerJobMetricGroup, jobStatusStore, () -> (long) numRestarts);
+                jobManagerJobMetricGroup,
+                jobStatusStore,
+                () -> (long) numRestarts,
+                tmpJobStatusListeners::add,
+                initializationTimestamp,
+                MetricOptions.JobStatusMetricsSettings.fromConfiguration(configuration));
+
+        jobStatusListeners = Collections.unmodifiableCollection(tmpJobStatusListeners);
     }
 
     private static void assertPreconditions(JobGraph jobGraph) throws RuntimeException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGaugeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGaugeTest.java
deleted file mode 100644
index a927508..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGaugeTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph.metrics;
-
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.runtime.executiongraph.TestingJobStatusProvider;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-/** Tests for {@link RestartTimeGauge}. */
-public class RestartTimeGaugeTest extends TestLogger {
-
-    @Test
-    public void testNotRestarted() {
-        final RestartTimeGauge gauge =
-                new RestartTimeGauge(new TestingJobStatusProvider(JobStatus.RUNNING, -1));
-        assertThat(gauge.getValue(), is(0L));
-    }
-
-    @Test
-    public void testInRestarting() {
-        final Map<JobStatus, Long> statusTimestampMap = new HashMap<>();
-        statusTimestampMap.put(JobStatus.RESTARTING, 1L);
-
-        final RestartTimeGauge gauge =
-                new RestartTimeGauge(
-                        new TestingJobStatusProvider(
-                                JobStatus.RESTARTING,
-                                status -> statusTimestampMap.getOrDefault(status, -1L)));
-        assertThat(gauge.getValue(), greaterThan(0L));
-    }
-
-    @Test
-    public void testRunningAfterRestarting() {
-        final Map<JobStatus, Long> statusTimestampMap = new HashMap<>();
-        statusTimestampMap.put(JobStatus.RESTARTING, 123L);
-        statusTimestampMap.put(JobStatus.RUNNING, 234L);
-
-        final RestartTimeGauge gauge =
-                new RestartTimeGauge(
-                        new TestingJobStatusProvider(
-                                JobStatus.RUNNING,
-                                status -> statusTimestampMap.getOrDefault(status, -1L)));
-        assertThat(gauge.getValue(), is(111L));
-    }
-
-    @Test
-    public void testFailedAfterRestarting() {
-        final Map<JobStatus, Long> statusTimestampMap = new HashMap<>();
-        statusTimestampMap.put(JobStatus.RESTARTING, 123L);
-        statusTimestampMap.put(JobStatus.FAILED, 456L);
-
-        final RestartTimeGauge gauge =
-                new RestartTimeGauge(
-                        new TestingJobStatusProvider(
-                                JobStatus.FAILED,
-                                status -> statusTimestampMap.getOrDefault(status, -1L)));
-        assertThat(gauge.getValue(), is(333L));
-    }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java
index f83e7cd..95a6f22 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java
@@ -61,4 +61,12 @@ public final class SlotPoolTestUtils {
         return slotPool.offerSlots(
                 slotOffers, new LocalTaskManagerLocation(), taskManagerGateway, 0);
     }
+
+    @Nonnull
+    public static Collection<SlotOffer> offerSlots(
+            SlotPool slotPool,
+            Collection<SlotOffer> slotOffers,
+            TaskManagerGateway taskManagerGateway) {
+        return slotPool.offerSlots(new LocalTaskManagerLocation(), taskManagerGateway, slotOffers);
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 46742dc..a63a752 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -19,11 +19,14 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.testutils.ScheduledTask;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -33,6 +36,7 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -60,6 +64,15 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTest;
 import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntryMatcher;
 import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
@@ -74,6 +87,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -88,11 +102,13 @@ import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.hamcrest.collection.IsIterableWithSize;
 import org.hamcrest.core.Is;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -112,6 +128,8 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
 import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
 import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableCheckpointing;
 import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator;
@@ -1378,6 +1396,88 @@ public class DefaultSchedulerTest extends TestLogger {
     }
 
     @Test
+    public void testStatusMetrics() throws Exception {
+        // running time acts as a stand-in for generic status time metrics
+        final CompletableFuture<Gauge<Long>> runningTimeMetricFuture = new CompletableFuture<>();
+        final MetricRegistry metricRegistry =
+                TestingMetricRegistry.builder()
+                        .setRegisterConsumer(
+                                (metric, name, group) -> {
+                                    switch (name) {
+                                        case "runningTimeTotal":
+                                            runningTimeMetricFuture.complete((Gauge<Long>) metric);
+                                            break;
+                                    }
+                                })
+                        .build();
+
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
+
+        final Configuration configuration = new Configuration();
+        configuration.set(
+                MetricOptions.JOB_STATUS_METRICS,
+                Arrays.asList(MetricOptions.JobStatusMetrics.TOTAL_TIME));
+
+        final ComponentMainThreadExecutor singleThreadMainThreadExecutor =
+                ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        scheduledExecutorService);
+
+        final Time slotTimeout = Time.milliseconds(5L);
+        final SlotPool slotPool =
+                new DeclarativeSlotPoolBridgeBuilder()
+                        .setBatchSlotTimeout(slotTimeout)
+                        .buildAndStart(singleThreadMainThreadExecutor);
+        final PhysicalSlotProvider slotProvider =
+                new PhysicalSlotProviderImpl(
+                        LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
+
+        final DefaultScheduler scheduler =
+                createSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setJobManagerJobMetricGroup(
+                                JobManagerMetricGroup.createJobManagerMetricGroup(
+                                                metricRegistry, "localhost")
+                                        .addJob(new JobID(), "jobName"))
+                        .setExecutionSlotAllocatorFactory(
+                                SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
+                                        slotProvider, slotTimeout))
+                        .build();
+
+        final AdaptiveSchedulerTest.SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new AdaptiveSchedulerTest.SubmissionBufferingTaskManagerGateway(1);
+
+        taskManagerGateway.setCancelConsumer(
+                executionAttemptId -> {
+                    singleThreadMainThreadExecutor.execute(
+                            () ->
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    executionAttemptId, ExecutionState.CANCELED)));
+                });
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+
+                    offerSlots(
+                            slotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
+                            taskManagerGateway);
+                });
+
+        // wait for the first task submission
+        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5));
+
+        // sleep a bit to ensure uptime is > 0
+        Thread.sleep(10L);
+
+        final Gauge<Long> runningTimeGauge = runningTimeMetricFuture.get();
+        Assert.assertThat(runningTimeGauge.getValue(), greaterThan(0L));
+    }
+
+    @Test
     public void testDeploymentWaitForProducedPartitionRegistration() {
         shuffleMaster.setAutoCompleteRegistration(false);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerBaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerBaseTest.java
new file mode 100644
index 0000000..1584f56
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerBaseTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class SchedulerBaseTest {
+
+    @Test
+    void testStateMetric() {
+        final SchedulerBase.JobStatusMetrics jobStatusMetrics =
+                new SchedulerBase.JobStatusMetrics(
+                        new UnregisteredMetricsGroup(),
+                        0L,
+                        enable(
+                                MetricOptions.JobStatusMetrics.STATE,
+                                MetricOptions.JobStatusMetrics.CURRENT_TIME,
+                                MetricOptions.JobStatusMetrics.TOTAL_TIME));
+
+        final Gauge<Long> metric = jobStatusMetrics.createStateMetric(JobStatus.RUNNING);
+
+        assertThat(metric.getValue()).isEqualTo(0L);
+        jobStatusMetrics.jobStatusChanges(new JobID(), JobStatus.RUNNING, 1L);
+        assertThat(metric.getValue()).isEqualTo(1L);
+        jobStatusMetrics.jobStatusChanges(new JobID(), JobStatus.RESTARTING, 2L);
+        assertThat(metric.getValue()).isEqualTo(0L);
+    }
+
+    @Test
+    void testCurrentTimeMetric() {
+        final SchedulerBase.JobStatusMetrics jobStatusMetrics =
+                new SchedulerBase.JobStatusMetrics(
+                        new UnregisteredMetricsGroup(),
+                        0L,
+                        enable(
+                                MetricOptions.JobStatusMetrics.STATE,
+                                MetricOptions.JobStatusMetrics.CURRENT_TIME,
+                                MetricOptions.JobStatusMetrics.TOTAL_TIME));
+
+        final ManualClock clock = new ManualClock();
+        final Gauge<Long> metric =
+                jobStatusMetrics.createCurrentTimeMetric(JobStatus.RUNNING, clock);
+
+        assertThat(metric.getValue()).isEqualTo(0L);
+        jobStatusMetrics.jobStatusChanges(new JobID(), JobStatus.RUNNING, 1L);
+        clock.advanceTime(Duration.ofMillis(11));
+        assertThat(metric.getValue()).isEqualTo(10L);
+        jobStatusMetrics.jobStatusChanges(new JobID(), JobStatus.RESTARTING, 15L);
+        assertThat(metric.getValue()).isEqualTo(0L);
+    }
+
+    @Test
+    void testTotalTimeMetric() {
+        final SchedulerBase.JobStatusMetrics jobStatusMetrics =
+                new SchedulerBase.JobStatusMetrics(
+                        new UnregisteredMetricsGroup(),
+                        0L,
+                        enable(
+                                MetricOptions.JobStatusMetrics.STATE,
+                                MetricOptions.JobStatusMetrics.CURRENT_TIME,
+                                MetricOptions.JobStatusMetrics.TOTAL_TIME));
+
+        final ManualClock clock = new ManualClock(0);
+        final Gauge<Long> metric = jobStatusMetrics.createTotalTimeMetric(JobStatus.RUNNING, clock);
+
+        assertThat(metric.getValue()).isEqualTo(0L);
+
+        jobStatusMetrics.jobStatusChanges(
+                new JobID(), JobStatus.RUNNING, clock.absoluteTimeMillis());
+
+        clock.advanceTime(Duration.ofMillis(10));
+        assertThat(metric.getValue()).isEqualTo(10L);
+
+        jobStatusMetrics.jobStatusChanges(
+                new JobID(), JobStatus.RESTARTING, clock.absoluteTimeMillis());
+
+        clock.advanceTime(Duration.ofMillis(4));
+        assertThat(metric.getValue()).isEqualTo(10L);
+
+        jobStatusMetrics.jobStatusChanges(
+                new JobID(), JobStatus.RUNNING, clock.absoluteTimeMillis());
+
+        clock.advanceTime(Duration.ofMillis(1));
+        assertThat(metric.getValue()).isEqualTo(11L);
+    }
+
+    @Test
+    void testStatusSelection() {
+        final InterceptingOperatorMetricGroup metricGroup = new InterceptingOperatorMetricGroup();
+
+        final SchedulerBase.JobStatusMetrics jobStatusMetrics =
+                new SchedulerBase.JobStatusMetrics(
+                        metricGroup, 0L, enable(MetricOptions.JobStatusMetrics.STATE));
+        final Map<JobStatus, StatusMetricSet> registeredMetrics = extractMetrics(metricGroup);
+
+        for (JobStatus value : JobStatus.values()) {
+            if (value.isTerminalState() || value == JobStatus.RECONCILING) {
+                assertThat(registeredMetrics).doesNotContainKey(value);
+            } else {
+                assertThat(registeredMetrics).containsKey(value);
+            }
+        }
+    }
+
+    @Test
+    void testEnableStateMetrics() {
+        testMetricSelection(MetricOptions.JobStatusMetrics.STATE);
+    }
+
+    @Test
+    void testEnableCurrentTimeMetrics() {
+        testMetricSelection(MetricOptions.JobStatusMetrics.CURRENT_TIME);
+    }
+
+    @Test
+    void testEnableTotalTimeMetrics() {
+        testMetricSelection(MetricOptions.JobStatusMetrics.TOTAL_TIME);
+    }
+
+    @Test
+    void testEnableMultipleMetrics() {
+        testMetricSelection(
+                MetricOptions.JobStatusMetrics.CURRENT_TIME,
+                MetricOptions.JobStatusMetrics.TOTAL_TIME);
+    }
+
+    private static void testMetricSelection(MetricOptions.JobStatusMetrics... selectedMetrics) {
+        final EnumSet<MetricOptions.JobStatusMetrics> selectedMetricsSet =
+                EnumSet.noneOf(MetricOptions.JobStatusMetrics.class);
+        Arrays.stream(selectedMetrics).forEach(selectedMetricsSet::add);
+
+        final InterceptingOperatorMetricGroup metricGroup = new InterceptingOperatorMetricGroup();
+
+        final SchedulerBase.JobStatusMetrics jobStatusMetrics =
+                new SchedulerBase.JobStatusMetrics(metricGroup, 1L, enable(selectedMetrics));
+        final Map<JobStatus, StatusMetricSet> registeredMetrics = extractMetrics(metricGroup);
+
+        for (StatusMetricSet metrics : registeredMetrics.values()) {
+            assertThat(metrics.getState().isPresent())
+                    .isEqualTo(selectedMetricsSet.contains(MetricOptions.JobStatusMetrics.STATE));
+            assertThat(metrics.getCurrentTime().isPresent())
+                    .isEqualTo(
+                            selectedMetricsSet.contains(
+                                    MetricOptions.JobStatusMetrics.CURRENT_TIME));
+            assertThat(metrics.getTotalTime().isPresent())
+                    .isEqualTo(
+                            selectedMetricsSet.contains(MetricOptions.JobStatusMetrics.TOTAL_TIME));
+        }
+    }
+
+    private static MetricOptions.JobStatusMetricsSettings enable(
+            MetricOptions.JobStatusMetrics... enabledMetrics) {
+        final Configuration configuration = new Configuration();
+
+        configuration.set(MetricOptions.JOB_STATUS_METRICS, Arrays.asList(enabledMetrics));
+
+        return MetricOptions.JobStatusMetricsSettings.fromConfiguration(configuration);
+    }
+
+    private static Map<JobStatus, StatusMetricSet> extractMetrics(
+            InterceptingOperatorMetricGroup metrics) {
+        final Map<JobStatus, StatusMetricSet> extractedMetrics = new EnumMap<>(JobStatus.class);
+
+        for (JobStatus jobStatus : JobStatus.values()) {
+            final StatusMetricSet statusMetricSet =
+                    new StatusMetricSet(
+                            (Gauge<Long>)
+                                    metrics.get(
+                                            SchedulerBase.JobStatusMetrics.getStateMetricName(
+                                                    jobStatus)),
+                            (Gauge<Long>)
+                                    metrics.get(
+                                            SchedulerBase.JobStatusMetrics.getCurrentTimeMetricName(
+                                                    jobStatus)),
+                            (Gauge<Long>)
+                                    metrics.get(
+                                            SchedulerBase.JobStatusMetrics.getTotalTimeMetricName(
+                                                    jobStatus)));
+            if (statusMetricSet.getState().isPresent()
+                    || statusMetricSet.getCurrentTime().isPresent()
+                    || statusMetricSet.getTotalTime().isPresent()) {
+                extractedMetrics.put(jobStatus, statusMetricSet);
+            }
+        }
+
+        return extractedMetrics;
+    }
+
+    private static class StatusMetricSet {
+
+        @Nullable private final Gauge<Long> state;
+        @Nullable private final Gauge<Long> currentTime;
+        @Nullable private final Gauge<Long> totalTime;
+
+        private StatusMetricSet(
+                @Nullable Gauge<Long> state,
+                @Nullable Gauge<Long> currentTime,
+                @Nullable Gauge<Long> totalTime) {
+            this.state = state;
+            this.currentTime = currentTime;
+            this.totalTime = totalTime;
+        }
+
+        @Nullable
+        public Optional<Gauge<Long>> getState() {
+            return Optional.ofNullable(state);
+        }
+
+        @Nullable
+        public Optional<Gauge<Long>> getCurrentTime() {
+            return Optional.ofNullable(currentTime);
+        }
+
+        @Nullable
+        public Optional<Gauge<Long>> getTotalTime() {
+            return Optional.ofNullable(totalTime);
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 2926316..3ca32d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
@@ -45,7 +46,6 @@ import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
-import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
 import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -93,6 +93,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -496,8 +497,8 @@ public class AdaptiveSchedulerTest extends TestLogger {
     public void testStatusMetrics() throws Exception {
         final CompletableFuture<UpTimeGauge> upTimeMetricFuture = new CompletableFuture<>();
         final CompletableFuture<DownTimeGauge> downTimeMetricFuture = new CompletableFuture<>();
-        final CompletableFuture<RestartTimeGauge> restartTimeMetricFuture =
-                new CompletableFuture<>();
+        // restartingTime acts as a stand-in for generic status time metrics
+        final CompletableFuture<Gauge<Long>> restartTimeMetricFuture = new CompletableFuture<>();
         final MetricRegistry metricRegistry =
                 TestingMetricRegistry.builder()
                         .setRegisterConsumer(
@@ -509,9 +510,8 @@ public class AdaptiveSchedulerTest extends TestLogger {
                                         case DownTimeGauge.METRIC_NAME:
                                             downTimeMetricFuture.complete((DownTimeGauge) metric);
                                             break;
-                                        case RestartTimeGauge.METRIC_NAME:
-                                            restartTimeMetricFuture.complete(
-                                                    (RestartTimeGauge) metric);
+                                        case "restartingTimeTotal":
+                                            restartTimeMetricFuture.complete((Gauge<Long>) metric);
                                             break;
                                     }
                                 })
@@ -525,6 +525,9 @@ public class AdaptiveSchedulerTest extends TestLogger {
         final Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1);
         configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(10L));
+        configuration.set(
+                MetricOptions.JOB_STATUS_METRICS,
+                Arrays.asList(MetricOptions.JobStatusMetrics.TOTAL_TIME));
 
         final AdaptiveScheduler scheduler =
                 new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor)
@@ -538,7 +541,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
 
         final UpTimeGauge upTimeGauge = upTimeMetricFuture.get();
         final DownTimeGauge downTimeGauge = downTimeMetricFuture.get();
-        final RestartTimeGauge restartTimeGauge = restartTimeMetricFuture.get();
+        final Gauge<Long> restartTimeGauge = restartTimeMetricFuture.get();
 
         final SubmissionBufferingTaskManagerGateway taskManagerGateway =
                 new SubmissionBufferingTaskManagerGateway(1 + PARALLELISM);
@@ -1202,7 +1205,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
      * A {@link SimpleAckingTaskManagerGateway} that buffers all the task submissions into a
      * blocking queue, allowing one to wait for an arbitrary number of submissions.
      */
-    private static class SubmissionBufferingTaskManagerGateway
+    public static class SubmissionBufferingTaskManagerGateway
             extends SimpleAckingTaskManagerGateway {
         final BlockingQueue<TaskDeploymentDescriptor> submittedTasks;
 

[flink] 01/02: [hotfix][tests] Refactor scheduler factory methods

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 397ca61c238a25f59ec823bceedf254667dbb524
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jan 13 12:04:11 2022 +0100

    [hotfix][tests] Refactor scheduler factory methods
    
    Factory methods were refactored to first create a builder that sets various defaults and then override the defaults.
---
 .../runtime/scheduler/DefaultSchedulerTest.java    | 41 ++++++++++++----------
 1 file changed, 22 insertions(+), 19 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 43beb6a..46742dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -1684,12 +1684,10 @@ public class DefaultSchedulerTest extends TestLogger {
 
     private DefaultScheduler createSchedulerAndStartScheduling(
             final JobGraph jobGraph, final ComponentMainThreadExecutor mainThreadExecutor) {
-        final SchedulingStrategyFactory schedulingStrategyFactory =
-                new PipelinedRegionSchedulingStrategy.Factory();
 
         try {
             final DefaultScheduler scheduler =
-                    createScheduler(jobGraph, mainThreadExecutor, schedulingStrategyFactory);
+                    createSchedulerBuilder(jobGraph, mainThreadExecutor).build();
             mainThreadExecutor.execute(scheduler::startScheduling);
             return scheduler;
         } catch (Exception e) {
@@ -1702,11 +1700,9 @@ public class DefaultSchedulerTest extends TestLogger {
             final ComponentMainThreadExecutor mainThreadExecutor,
             final SchedulingStrategyFactory schedulingStrategyFactory)
             throws Exception {
-        return createScheduler(
-                jobGraph,
-                mainThreadExecutor,
-                schedulingStrategyFactory,
-                new RestartPipelinedRegionFailoverStrategy.Factory());
+        return createSchedulerBuilder(jobGraph, mainThreadExecutor)
+                .setSchedulingStrategyFactory(schedulingStrategyFactory)
+                .build();
     }
 
     private DefaultScheduler createScheduler(
@@ -1715,12 +1711,10 @@ public class DefaultSchedulerTest extends TestLogger {
             final SchedulingStrategyFactory schedulingStrategyFactory,
             final FailoverStrategy.Factory failoverStrategyFactory)
             throws Exception {
-        return createScheduler(
-                jobGraph,
-                mainThreadExecutor,
-                schedulingStrategyFactory,
-                failoverStrategyFactory,
-                taskRestartExecutor);
+        return createSchedulerBuilder(jobGraph, mainThreadExecutor)
+                .setSchedulingStrategyFactory(schedulingStrategyFactory)
+                .setFailoverStrategyFactory(failoverStrategyFactory)
+                .build();
     }
 
     private DefaultScheduler createScheduler(
@@ -1730,22 +1724,31 @@ public class DefaultSchedulerTest extends TestLogger {
             final FailoverStrategy.Factory failoverStrategyFactory,
             final ScheduledExecutor delayExecutor)
             throws Exception {
+        return createSchedulerBuilder(jobGraph, mainThreadExecutor)
+                .setDelayExecutor(delayExecutor)
+                .setSchedulingStrategyFactory(schedulingStrategyFactory)
+                .setFailoverStrategyFactory(failoverStrategyFactory)
+                .build();
+    }
+
+    private SchedulerTestingUtils.DefaultSchedulerBuilder createSchedulerBuilder(
+            final JobGraph jobGraph, final ComponentMainThreadExecutor mainThreadExecutor)
+            throws Exception {
         return SchedulerTestingUtils.newSchedulerBuilder(jobGraph, mainThreadExecutor)
                 .setLogger(log)
                 .setIoExecutor(executor)
                 .setJobMasterConfiguration(configuration)
                 .setFutureExecutor(scheduledExecutorService)
-                .setDelayExecutor(delayExecutor)
-                .setSchedulingStrategyFactory(schedulingStrategyFactory)
-                .setFailoverStrategyFactory(failoverStrategyFactory)
+                .setDelayExecutor(taskRestartExecutor)
+                .setSchedulingStrategyFactory(new PipelinedRegionSchedulingStrategy.Factory())
+                .setFailoverStrategyFactory(new RestartPipelinedRegionFailoverStrategy.Factory())
                 .setRestartBackoffTimeStrategy(testRestartBackoffTimeStrategy)
                 .setExecutionVertexOperations(testExecutionVertexOperations)
                 .setExecutionVertexVersioner(executionVertexVersioner)
                 .setExecutionSlotAllocatorFactory(executionSlotAllocatorFactory)
                 .setShuffleMaster(shuffleMaster)
                 .setPartitionTracker(partitionTracker)
-                .setRpcTimeout(timeout)
-                .build();
+                .setRpcTimeout(timeout);
     }
 
     /**