You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/06 18:17:38 UTC

[GitHub] [flink] atris opened a new pull request, #20187: [FLINK-27473] Introduce Generic Metrics For Job States

atris opened a new pull request, #20187:
URL: https://github.com/apache/flink/pull/20187

   This commit introduces a new type of metrics and extends it to replace the existing deployment metrics, and introduces
   initialization time metrics.
   
   A new wrapper is introduced, which creates two metrics, one for deployment and one for initialization. Both are created using the new metric type. Any future metrics should be introduced in the wrapper itself.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #20187: [FLINK-27473] Introduce Generic Metrics For Job States

Posted by GitBox <gi...@apache.org>.
zentol commented on PR #20187:
URL: https://github.com/apache/flink/pull/20187#issuecomment-1240930119

   I made a pass over the PR and prepared a significant set of changes here: https://github.com/atris/flink/pull/1
   
   I know that this isn't the ideal approach; but otherwise we would've had to go through a lot of iterations (as I just did on my own; kept finding more things that could be changed :see_no_evil:).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #20187: [FLINK-27473] Introduce Generic Metrics For Job States

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #20187:
URL: https://github.com/apache/flink/pull/20187#discussion_r915953497


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/metrics/MetricsWrapper.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
+import org.apache.flink.runtime.jobgraph.JobType;
+
+/** Over arching metrics wrapper which manages multiple state time metrics. */
+public class MetricsWrapper implements ExecutionStateUpdateListener, MetricsRegistrar {

Review Comment:
   ```suggestion
   public class RunningSubStateTimeMetrics implements ExecutionStateUpdateListener, MetricsRegistrar {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/metrics/utils/MetricsPredicateProvider.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.metrics.utils;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobType;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.Set;
+import java.util.function.Predicate;
+
+/** Helper class which provides relevant predicates for state metrics. */
+public class MetricsPredicateProvider {
+
+    /**
+     * Get the predicate which indicates start of metric collection.
+     *
+     * @param semantic Job type
+     * @param executionState Execution state of the job
+     */
+    public static Predicate<JobExecutionStatsHolder> getStartPredicate(
+            JobType semantic, ExecutionState executionState) {
+        Predicate<JobExecutionStatsHolder> resultPredicate;
+        if (semantic == JobType.BATCH) {
+            if (executionState == ExecutionState.INITIALIZING) {
+                // For batch, if there is no task running and atleast one task is initializing, the
+                // job is initializing
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                jobExecutionStatsHolder.getCompletedDeployments() == 0;
+            } else if (executionState == ExecutionState.DEPLOYING) {
+                // If the job type is batch:
+                // Deployment is started iff there is atleast one task in deploying state and no
+                // task
+                // is either
+                // initializing or running.
+                resultPredicate =
+                        deploymentPair ->
+                                deploymentPair.getInitializingDeployments() == 0
+                                        && deploymentPair.getCompletedDeployments() == 0;
+            } else {
+                throw new RuntimeException("Illegal State Passed");
+            }
+        } else {
+            if (executionState == ExecutionState.INITIALIZING) {
+                // For streaming, if there is no task which is deploying and atleast one task is
+                // initializing, the job is initializing
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                jobExecutionStatsHolder.getPendingDeployments() == 0;
+            } else if (executionState == ExecutionState.DEPLOYING) {
+                // If the job type is streaming:
+                // Deployment is measures from start of first deployment
+                resultPredicate = deploymentPair -> true;
+            } else {
+                throw new RuntimeException("Illegal State Passed");
+            }
+        }
+
+        return resultPredicate;
+    }
+
+    /**
+     * Get the predicate which marks end of the metrics collection.
+     *
+     * @param semantic job type
+     * @param executionState current state of the job
+     * @param expectedDeployments Expected deployments count
+     */
+    public static Predicate<JobExecutionStatsHolder> getEndPredicate(
+            JobType semantic, ExecutionState executionState, Set expectedDeployments) {
+        Predicate<JobExecutionStatsHolder> resultPredicate;
+        if (semantic == JobType.BATCH) {
+            if (executionState == ExecutionState.INITIALIZING) {
+                resultPredicate =
+                        jobExecutionStats -> jobExecutionStats.getCompletedDeployments() > 0;
+            } else if (executionState == ExecutionState.DEPLOYING) {
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                jobExecutionStatsHolder.getInitializingDeployments() > 0
+                                        || jobExecutionStatsHolder.getCompletedDeployments() > 0;
+            } else {
+                throw new RuntimeException("Illegal State Passed");
+            }
+        } else {
+            if (executionState == ExecutionState.INITIALIZING) {
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                jobExecutionStatsHolder.getCompletedDeployments()
+                                        == expectedDeployments.size();
+            } else if (executionState == ExecutionState.DEPLOYING) {
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                (jobExecutionStatsHolder.getInitializingDeployments()
+                                                + jobExecutionStatsHolder.getCompletedDeployments())
+                                        == expectedDeployments.size();
+            } else {
+                throw new RuntimeException("Illegal State Passed");
+            }
+        }
+
+        return resultPredicate;
+    }
+
+    /**
+     * Return the auxillary predicate involved in the metric start NOTE: The responsibility of
+     * passing in the correct input parameter lies with the caller.
+     *
+     * @param executionState Current state of the job
+     */
+    // NOTE: The responsibility of passing in the correct input parameter lies with the caller

Review Comment:
   That's always the case, no?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/metrics/SubStateTimeMetrics.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.scheduler.metrics.utils.JobExecutionStatsHolder;
+import org.apache.flink.runtime.scheduler.metrics.utils.MetricsPredicateProvider;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/**
+ * Defines metrics that capture different aspects of a job execution (time spent in deployment, time
+ * spent in initialization etc). Specifics are passed in through the constructor and then.
+ */
+public class SubStateTimeMetrics
+        implements ExecutionStateUpdateListener, StateTimeMetric, MetricsRegistrar {
+    private static final long NOT_STARTED = -1L;
+
+    private final JobType semantic;
+    private final String metricName;
+    private final MetricOptions.JobStatusMetricsSettings stateTimeMetricSettings;
+    private final ExecutionState currentState;
+
+    private final Predicate<JobExecutionStatsHolder> metricStartPredicate;
+    private final Predicate<JobExecutionStatsHolder> metricEndPredicate;
+
+    private final Clock clock;
+
+    // book-keeping
+    private final Set<ExecutionAttemptID> expectedDeployments = new HashSet<>();
+    private int pendingDeployments = 0;
+    private int initializingDeployments = 0;
+    private int completedDeployments = 0;
+
+    // metrics state
+    private long metricStart = NOT_STARTED;
+    private long metricTimeTotal = 0L;
+
+    public SubStateTimeMetrics(
+            JobType semantic,
+            MetricOptions.JobStatusMetricsSettings stateTimeMetricSettings,
+            ExecutionState currentState,
+            String name) {
+        this(semantic, stateTimeMetricSettings, currentState, SystemClock.getInstance(), name);
+    }
+
+    public SubStateTimeMetrics(

Review Comment:
   ```suggestion
       @VisibleForTesting
       SubStateTimeMetrics(
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -279,14 +279,13 @@ public AdaptiveScheduler(
         final MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings =
                 MetricOptions.JobStatusMetricsSettings.fromConfiguration(configuration);
 
-        deploymentTimeMetrics =
-                new DeploymentStateTimeMetrics(jobGraph.getJobType(), jobStatusMetricsSettings);
+        metricsWrapper = new MetricsWrapper(jobGraph.getJobType(), jobStatusMetricsSettings);

Review Comment:
   The variable needs a better name, similar to the class.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/metrics/SubStateTimeMetricsTest.java:
##########
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.metrics;
+
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
+import static org.apache.flink.runtime.scheduler.metrics.StateTimeMetricTest.enable;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for SubStateTime metrics. */
+public class SubStateTimeMetricsTest {
+    private static final MetricOptions.JobStatusMetricsSettings settings =
+            enable(
+                    MetricOptions.JobStatusMetrics.STATE,
+                    MetricOptions.JobStatusMetrics.CURRENT_TIME,
+                    MetricOptions.JobStatusMetrics.TOTAL_TIME);
+
+    @Test
+    void testInitialValues() {
+        final ManualClock clock = new ManualClock(Duration.ofMillis(5).toNanos());
+
+        final SubStateTimeMetrics initializationStateTimeMetrics =
+                new SubStateTimeMetrics(
+                        JobType.BATCH, settings, ExecutionState.DEPLOYING, clock, "deploying");
+
+        assertThat(initializationStateTimeMetrics.getCurrentTime()).isEqualTo(0L);
+        assertThat(initializationStateTimeMetrics.getTotalTime()).isEqualTo(0L);
+        assertThat(initializationStateTimeMetrics.getBinary()).isEqualTo(0L);
+    }
+
+    @Test
+    void testInitializingOnFirstInitialization() {
+        final SubStateTimeMetrics metrics =
+                new SubStateTimeMetrics(
+                        JobType.BATCH, settings, ExecutionState.INITIALIZING, "initialising");

Review Comment:
   Either infer the metric name from the `ExecutionState`, or add state-specific factory methods to ensure the state+metric name is consistent. (There's currently difference in the test vs production code; initiali**z**ing vs initiali**s**ing)



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/metrics/SubStateTimeMetricsTest.java:
##########
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.metrics;
+
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
+import static org.apache.flink.runtime.scheduler.metrics.StateTimeMetricTest.enable;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for SubStateTime metrics. */
+public class SubStateTimeMetricsTest {
+    private static final MetricOptions.JobStatusMetricsSettings settings =
+            enable(
+                    MetricOptions.JobStatusMetrics.STATE,
+                    MetricOptions.JobStatusMetrics.CURRENT_TIME,
+                    MetricOptions.JobStatusMetrics.TOTAL_TIME);
+
+    @Test
+    void testInitialValues() {
+        final ManualClock clock = new ManualClock(Duration.ofMillis(5).toNanos());
+
+        final SubStateTimeMetrics initializationStateTimeMetrics =
+                new SubStateTimeMetrics(
+                        JobType.BATCH, settings, ExecutionState.DEPLOYING, clock, "deploying");
+
+        assertThat(initializationStateTimeMetrics.getCurrentTime()).isEqualTo(0L);
+        assertThat(initializationStateTimeMetrics.getTotalTime()).isEqualTo(0L);
+        assertThat(initializationStateTimeMetrics.getBinary()).isEqualTo(0L);
+    }
+
+    @Test
+    void testInitializingOnFirstInitialization() {
+        final SubStateTimeMetrics metrics =
+                new SubStateTimeMetrics(
+                        JobType.BATCH, settings, ExecutionState.INITIALIZING, "initialising");
+
+        final ExecutionAttemptID id1 = createExecutionAttemptId();
+
+        metrics.onStateUpdate(id1, ExecutionState.CREATED, ExecutionState.SCHEDULED);
+        assertThat(metrics.getBinary()).isEqualTo(0L);
+
+        metrics.onStateUpdate(id1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
+        assertThat(metrics.getBinary()).isEqualTo(0L);
+
+        metrics.onStateUpdate(id1, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
+        assertThat(metrics.getBinary()).isEqualTo(1L);
+    }
+
+    @Test
+    void testInitializingStart_batch_notTriggeredIfOneDeploymentIsRunning() {
+        final SubStateTimeMetrics metrics =
+                new SubStateTimeMetrics(
+                        JobType.BATCH, settings, ExecutionState.INITIALIZING, "initialising");
+
+        final ExecutionAttemptID id1 = createExecutionAttemptId();
+
+        metrics.onStateUpdate(id1, ExecutionState.CREATED, ExecutionState.SCHEDULED);
+        metrics.onStateUpdate(id1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
+        metrics.onStateUpdate(id1, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
+        metrics.onStateUpdate(id1, ExecutionState.INITIALIZING, ExecutionState.RUNNING);
+
+        final ExecutionAttemptID id2 = createExecutionAttemptId();
+
+        metrics.onStateUpdate(id2, ExecutionState.CREATED, ExecutionState.SCHEDULED);
+        metrics.onStateUpdate(id2, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
+        metrics.onStateUpdate(id2, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
+
+        assertThat(metrics.getBinary()).isEqualTo(0L);
+    }
+
+    @Test
+    void testOneInitializingAndOneDeploying_batch() {
+        final SubStateTimeMetrics metrics =
+                new SubStateTimeMetrics(
+                        JobType.BATCH, settings, ExecutionState.INITIALIZING, "initialising");
+
+        final ExecutionAttemptID id1 = createExecutionAttemptId();
+        final ExecutionAttemptID id2 = createExecutionAttemptId();
+
+        metrics.onStateUpdate(id1, ExecutionState.CREATED, ExecutionState.SCHEDULED);
+        metrics.onStateUpdate(id2, ExecutionState.CREATED, ExecutionState.SCHEDULED);
+
+        metrics.onStateUpdate(id1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
+        metrics.onStateUpdate(id2, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
+
+        metrics.onStateUpdate(id1, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
+        assertThat(metrics.getBinary()).isEqualTo(1L);
+    }
+
+    @Test
+    void testOneInitializingAndOneDeploying_streaming() {
+        final SubStateTimeMetrics metrics =
+                new SubStateTimeMetrics(
+                        JobType.STREAMING, settings, ExecutionState.INITIALIZING, "initialising");
+
+        final ExecutionAttemptID id1 = createExecutionAttemptId();
+        final ExecutionAttemptID id2 = createExecutionAttemptId();
+
+        metrics.onStateUpdate(id1, ExecutionState.CREATED, ExecutionState.SCHEDULED);
+        metrics.onStateUpdate(id2, ExecutionState.CREATED, ExecutionState.SCHEDULED);
+
+        metrics.onStateUpdate(id1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
+        metrics.onStateUpdate(id2, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
+
+        metrics.onStateUpdate(id1, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
+        assertThat(metrics.getBinary()).isEqualTo(0L);
+
+        metrics.onStateUpdate(id2, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
+        assertThat(metrics.getBinary()).isEqualTo(1L);
+    }
+
+    @Test
+    void testInitialzingEnd_streaming_ignoresTerminalDeployments() {

Review Comment:
   I'm not really fond of having to duplicate all tests for the 2 states we're interested in _at the moment_.
   
   There should be a `MetricsPredicateProviderTest` towards the state-specific logic.
   
   You may also want to pass in the predicates so the tests can be more independent from the `MetricsPredicateProvider`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/metrics/utils/MetricsPredicateProvider.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.metrics.utils;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobType;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.Set;
+import java.util.function.Predicate;
+
+/** Helper class which provides relevant predicates for state metrics. */
+public class MetricsPredicateProvider {
+
+    /**
+     * Get the predicate which indicates start of metric collection.
+     *
+     * @param semantic Job type
+     * @param executionState Execution state of the job
+     */
+    public static Predicate<JobExecutionStatsHolder> getStartPredicate(
+            JobType semantic, ExecutionState executionState) {
+        Predicate<JobExecutionStatsHolder> resultPredicate;
+        if (semantic == JobType.BATCH) {
+            if (executionState == ExecutionState.INITIALIZING) {
+                // For batch, if there is no task running and atleast one task is initializing, the
+                // job is initializing
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                jobExecutionStatsHolder.getCompletedDeployments() == 0;
+            } else if (executionState == ExecutionState.DEPLOYING) {
+                // If the job type is batch:
+                // Deployment is started iff there is atleast one task in deploying state and no
+                // task
+                // is either
+                // initializing or running.
+                resultPredicate =
+                        deploymentPair ->
+                                deploymentPair.getInitializingDeployments() == 0
+                                        && deploymentPair.getCompletedDeployments() == 0;
+            } else {
+                throw new RuntimeException("Illegal State Passed");
+            }
+        } else {
+            if (executionState == ExecutionState.INITIALIZING) {
+                // For streaming, if there is no task which is deploying and atleast one task is
+                // initializing, the job is initializing
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                jobExecutionStatsHolder.getPendingDeployments() == 0;
+            } else if (executionState == ExecutionState.DEPLOYING) {
+                // If the job type is streaming:
+                // Deployment is measures from start of first deployment
+                resultPredicate = deploymentPair -> true;
+            } else {
+                throw new RuntimeException("Illegal State Passed");
+            }
+        }
+
+        return resultPredicate;
+    }
+
+    /**
+     * Get the predicate which marks end of the metrics collection.
+     *
+     * @param semantic job type
+     * @param executionState current state of the job
+     * @param expectedDeployments Expected deployments count
+     */
+    public static Predicate<JobExecutionStatsHolder> getEndPredicate(
+            JobType semantic, ExecutionState executionState, Set expectedDeployments) {
+        Predicate<JobExecutionStatsHolder> resultPredicate;
+        if (semantic == JobType.BATCH) {
+            if (executionState == ExecutionState.INITIALIZING) {
+                resultPredicate =
+                        jobExecutionStats -> jobExecutionStats.getCompletedDeployments() > 0;
+            } else if (executionState == ExecutionState.DEPLOYING) {
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                jobExecutionStatsHolder.getInitializingDeployments() > 0
+                                        || jobExecutionStatsHolder.getCompletedDeployments() > 0;
+            } else {
+                throw new RuntimeException("Illegal State Passed");
+            }
+        } else {
+            if (executionState == ExecutionState.INITIALIZING) {
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                jobExecutionStatsHolder.getCompletedDeployments()
+                                        == expectedDeployments.size();
+            } else if (executionState == ExecutionState.DEPLOYING) {
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                (jobExecutionStatsHolder.getInitializingDeployments()
+                                                + jobExecutionStatsHolder.getCompletedDeployments())
+                                        == expectedDeployments.size();
+            } else {
+                throw new RuntimeException("Illegal State Passed");

Review Comment:
   Always good to include unsupported values.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/metrics/SubStateTimeMetrics.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.scheduler.metrics.utils.JobExecutionStatsHolder;
+import org.apache.flink.runtime.scheduler.metrics.utils.MetricsPredicateProvider;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/**
+ * Defines metrics that capture different aspects of a job execution (time spent in deployment, time
+ * spent in initialization etc). Specifics are passed in through the constructor and then.
+ */
+public class SubStateTimeMetrics
+        implements ExecutionStateUpdateListener, StateTimeMetric, MetricsRegistrar {
+    private static final long NOT_STARTED = -1L;
+
+    private final JobType semantic;
+    private final String metricName;
+    private final MetricOptions.JobStatusMetricsSettings stateTimeMetricSettings;
+    private final ExecutionState currentState;
+
+    private final Predicate<JobExecutionStatsHolder> metricStartPredicate;
+    private final Predicate<JobExecutionStatsHolder> metricEndPredicate;
+
+    private final Clock clock;
+
+    // book-keeping
+    private final Set<ExecutionAttemptID> expectedDeployments = new HashSet<>();
+    private int pendingDeployments = 0;
+    private int initializingDeployments = 0;
+    private int completedDeployments = 0;
+
+    // metrics state
+    private long metricStart = NOT_STARTED;
+    private long metricTimeTotal = 0L;
+
+    public SubStateTimeMetrics(
+            JobType semantic,
+            MetricOptions.JobStatusMetricsSettings stateTimeMetricSettings,
+            ExecutionState currentState,
+            String name) {
+        this(semantic, stateTimeMetricSettings, currentState, SystemClock.getInstance(), name);
+    }
+
+    public SubStateTimeMetrics(
+            JobType semantic,
+            MetricOptions.JobStatusMetricsSettings stateTimeMetricSettings,
+            ExecutionState currentState,
+            Clock clock,
+            String name) {
+        this.semantic = semantic;
+        this.stateTimeMetricSettings = stateTimeMetricSettings;
+        this.currentState = currentState;
+        this.clock = clock;
+        this.metricName = name;
+
+        metricStartPredicate = MetricsPredicateProvider.getStartPredicate(semantic, currentState);
+        metricEndPredicate =
+                MetricsPredicateProvider.getEndPredicate(
+                        semantic, currentState, expectedDeployments);
+    }
+
+    @Override
+    public long getCurrentTime() {
+        return metricStart == NOT_STARTED
+                ? 0L
+                : Math.max(0, clock.absoluteTimeMillis() - metricStart);
+    }
+
+    @Override
+    public long getTotalTime() {
+        return getCurrentTime() + metricTimeTotal;
+    }
+
+    @Override
+    public long getBinary() {
+        return metricStart == NOT_STARTED ? 0L : 1L;
+    }
+
+    @Override
+    public void registerMetrics(MetricGroup metricGroup) {
+        StateTimeMetric.register(stateTimeMetricSettings, metricGroup, this, metricName);
+    }
+
+    @Override
+    public void onStateUpdate(
+            ExecutionAttemptID execution, ExecutionState previousState, ExecutionState newState) {
+        switch (newState) {
+            case SCHEDULED:
+                expectedDeployments.add(execution);
+                break;
+            case DEPLOYING:
+                pendingDeployments++;
+                break;
+            case INITIALIZING:
+                initializingDeployments++;
+                break;
+            case RUNNING:
+                completedDeployments++;
+                break;
+            default:
+                // the deployment started terminating
+                expectedDeployments.remove(execution);
+        }
+        switch (previousState) {
+            case DEPLOYING:
+                pendingDeployments--;
+                break;
+            case INITIALIZING:
+                initializingDeployments--;
+                break;
+            case RUNNING:
+                completedDeployments--;
+                break;
+        }
+
+        Predicate<Pair<Integer, Integer>> auxillaryStartPredicate =
+                MetricsPredicateProvider.getAuxillaryStartPredicate(currentState);
+        Predicate<Integer> auxillaryEndPredicate =
+                MetricsPredicateProvider.getAuxillaryEndPredicate();

Review Comment:
   There's no need to recreate these on each state transition; set them up in the constructor.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/metrics/SubStateTimeMetrics.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.scheduler.metrics.utils.JobExecutionStatsHolder;
+import org.apache.flink.runtime.scheduler.metrics.utils.MetricsPredicateProvider;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/**
+ * Defines metrics that capture different aspects of a job execution (time spent in deployment, time
+ * spent in initialization etc). Specifics are passed in through the constructor and then.
+ */
+public class SubStateTimeMetrics
+        implements ExecutionStateUpdateListener, StateTimeMetric, MetricsRegistrar {
+    private static final long NOT_STARTED = -1L;
+
+    private final JobType semantic;
+    private final String metricName;
+    private final MetricOptions.JobStatusMetricsSettings stateTimeMetricSettings;
+    private final ExecutionState currentState;
+
+    private final Predicate<JobExecutionStatsHolder> metricStartPredicate;
+    private final Predicate<JobExecutionStatsHolder> metricEndPredicate;
+
+    private final Clock clock;
+
+    // book-keeping
+    private final Set<ExecutionAttemptID> expectedDeployments = new HashSet<>();
+    private int pendingDeployments = 0;
+    private int initializingDeployments = 0;
+    private int completedDeployments = 0;
+
+    // metrics state
+    private long metricStart = NOT_STARTED;
+    private long metricTimeTotal = 0L;
+
+    public SubStateTimeMetrics(
+            JobType semantic,
+            MetricOptions.JobStatusMetricsSettings stateTimeMetricSettings,
+            ExecutionState currentState,

Review Comment:
   ```suggestion
               ExecutionState targetState,
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/metrics/SubStateTimeMetrics.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.scheduler.metrics.utils.JobExecutionStatsHolder;
+import org.apache.flink.runtime.scheduler.metrics.utils.MetricsPredicateProvider;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/**
+ * Defines metrics that capture different aspects of a job execution (time spent in deployment, time
+ * spent in initialization etc). Specifics are passed in through the constructor and then.
+ */
+public class SubStateTimeMetrics
+        implements ExecutionStateUpdateListener, StateTimeMetric, MetricsRegistrar {
+    private static final long NOT_STARTED = -1L;
+
+    private final JobType semantic;

Review Comment:
   ```suggestion
   ```
   unused



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/metrics/utils/MetricsPredicateProvider.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.metrics.utils;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobType;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.Set;
+import java.util.function.Predicate;
+
+/** Helper class which provides relevant predicates for state metrics. */
+public class MetricsPredicateProvider {
+
+    /**
+     * Get the predicate which indicates start of metric collection.
+     *
+     * @param semantic Job type
+     * @param executionState Execution state of the job
+     */
+    public static Predicate<JobExecutionStatsHolder> getStartPredicate(
+            JobType semantic, ExecutionState executionState) {
+        Predicate<JobExecutionStatsHolder> resultPredicate;
+        if (semantic == JobType.BATCH) {
+            if (executionState == ExecutionState.INITIALIZING) {
+                // For batch, if there is no task running and atleast one task is initializing, the
+                // job is initializing
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                jobExecutionStatsHolder.getCompletedDeployments() == 0;
+            } else if (executionState == ExecutionState.DEPLOYING) {
+                // If the job type is batch:
+                // Deployment is started iff there is atleast one task in deploying state and no
+                // task
+                // is either
+                // initializing or running.
+                resultPredicate =
+                        deploymentPair ->
+                                deploymentPair.getInitializingDeployments() == 0
+                                        && deploymentPair.getCompletedDeployments() == 0;
+            } else {
+                throw new RuntimeException("Illegal State Passed");
+            }
+        } else {
+            if (executionState == ExecutionState.INITIALIZING) {
+                // For streaming, if there is no task which is deploying and atleast one task is
+                // initializing, the job is initializing
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                jobExecutionStatsHolder.getPendingDeployments() == 0;
+            } else if (executionState == ExecutionState.DEPLOYING) {
+                // If the job type is streaming:
+                // Deployment is measures from start of first deployment
+                resultPredicate = deploymentPair -> true;
+            } else {
+                throw new RuntimeException("Illegal State Passed");
+            }
+        }
+
+        return resultPredicate;
+    }
+
+    /**
+     * Get the predicate which marks end of the metrics collection.
+     *
+     * @param semantic job type
+     * @param executionState current state of the job
+     * @param expectedDeployments Expected deployments count
+     */
+    public static Predicate<JobExecutionStatsHolder> getEndPredicate(
+            JobType semantic, ExecutionState executionState, Set expectedDeployments) {
+        Predicate<JobExecutionStatsHolder> resultPredicate;
+        if (semantic == JobType.BATCH) {
+            if (executionState == ExecutionState.INITIALIZING) {
+                resultPredicate =
+                        jobExecutionStats -> jobExecutionStats.getCompletedDeployments() > 0;
+            } else if (executionState == ExecutionState.DEPLOYING) {
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                jobExecutionStatsHolder.getInitializingDeployments() > 0
+                                        || jobExecutionStatsHolder.getCompletedDeployments() > 0;
+            } else {
+                throw new RuntimeException("Illegal State Passed");
+            }
+        } else {
+            if (executionState == ExecutionState.INITIALIZING) {
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                jobExecutionStatsHolder.getCompletedDeployments()
+                                        == expectedDeployments.size();
+            } else if (executionState == ExecutionState.DEPLOYING) {
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                (jobExecutionStatsHolder.getInitializingDeployments()
+                                                + jobExecutionStatsHolder.getCompletedDeployments())
+                                        == expectedDeployments.size();
+            } else {
+                throw new RuntimeException("Illegal State Passed");
+            }
+        }
+
+        return resultPredicate;
+    }
+
+    /**
+     * Return the auxillary predicate involved in the metric start NOTE: The responsibility of
+     * passing in the correct input parameter lies with the caller.
+     *
+     * @param executionState Current state of the job
+     */
+    // NOTE: The responsibility of passing in the correct input parameter lies with the caller
+    public static Predicate<Pair<Integer, Integer>> getAuxillaryStartPredicate(
+            ExecutionState executionState) {
+        Predicate<Pair<Integer, Integer>> resultPredicate;
+        if (executionState == ExecutionState.DEPLOYING) {

Review Comment:
   I remember that I suggested a generic implementation that bases everything on a triplet (preceding_states, target_states, following_states); why did you opt for hard-coding the predicates for each state? AFAICT this will make it more difficult to extend the set of metrics in the future (e.g., for runningTime or cancellingTime)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/metrics/utils/MetricsPredicateProvider.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.metrics.utils;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobType;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.Set;
+import java.util.function.Predicate;
+
+/** Helper class which provides relevant predicates for state metrics. */
+public class MetricsPredicateProvider {
+
+    /**
+     * Get the predicate which indicates start of metric collection.
+     *
+     * @param semantic Job type
+     * @param executionState Execution state of the job
+     */
+    public static Predicate<JobExecutionStatsHolder> getStartPredicate(
+            JobType semantic, ExecutionState executionState) {
+        Predicate<JobExecutionStatsHolder> resultPredicate;
+        if (semantic == JobType.BATCH) {
+            if (executionState == ExecutionState.INITIALIZING) {
+                // For batch, if there is no task running and atleast one task is initializing, the
+                // job is initializing
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                jobExecutionStatsHolder.getCompletedDeployments() == 0;
+            } else if (executionState == ExecutionState.DEPLOYING) {
+                // If the job type is batch:
+                // Deployment is started iff there is atleast one task in deploying state and no
+                // task
+                // is either
+                // initializing or running.
+                resultPredicate =
+                        deploymentPair ->
+                                deploymentPair.getInitializingDeployments() == 0
+                                        && deploymentPair.getCompletedDeployments() == 0;
+            } else {
+                throw new RuntimeException("Illegal State Passed");
+            }
+        } else {
+            if (executionState == ExecutionState.INITIALIZING) {
+                // For streaming, if there is no task which is deploying and atleast one task is
+                // initializing, the job is initializing
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                jobExecutionStatsHolder.getPendingDeployments() == 0;
+            } else if (executionState == ExecutionState.DEPLOYING) {
+                // If the job type is streaming:
+                // Deployment is measures from start of first deployment
+                resultPredicate = deploymentPair -> true;
+            } else {
+                throw new RuntimeException("Illegal State Passed");
+            }
+        }
+
+        return resultPredicate;
+    }
+
+    /**
+     * Get the predicate which marks end of the metrics collection.
+     *
+     * @param semantic job type
+     * @param executionState current state of the job
+     * @param expectedDeployments Expected deployments count
+     */
+    public static Predicate<JobExecutionStatsHolder> getEndPredicate(
+            JobType semantic, ExecutionState executionState, Set expectedDeployments) {
+        Predicate<JobExecutionStatsHolder> resultPredicate;
+        if (semantic == JobType.BATCH) {
+            if (executionState == ExecutionState.INITIALIZING) {
+                resultPredicate =
+                        jobExecutionStats -> jobExecutionStats.getCompletedDeployments() > 0;
+            } else if (executionState == ExecutionState.DEPLOYING) {
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                jobExecutionStatsHolder.getInitializingDeployments() > 0
+                                        || jobExecutionStatsHolder.getCompletedDeployments() > 0;
+            } else {
+                throw new RuntimeException("Illegal State Passed");
+            }
+        } else {
+            if (executionState == ExecutionState.INITIALIZING) {
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                jobExecutionStatsHolder.getCompletedDeployments()
+                                        == expectedDeployments.size();
+            } else if (executionState == ExecutionState.DEPLOYING) {
+                resultPredicate =
+                        jobExecutionStatsHolder ->
+                                (jobExecutionStatsHolder.getInitializingDeployments()
+                                                + jobExecutionStatsHolder.getCompletedDeployments())
+                                        == expectedDeployments.size();
+            } else {
+                throw new RuntimeException("Illegal State Passed");
+            }
+        }
+
+        return resultPredicate;
+    }
+
+    /**
+     * Return the auxillary predicate involved in the metric start NOTE: The responsibility of

Review Comment:
   It's not clear what this predicate does, or why separate predicates are required.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/metrics/MetricsWrapperTest.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.metrics;
+
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobType;
+
+import org.junit.Test;

Review Comment:
   New tests should be written with JUnit5 (which in this case means using a different import and removing the `public` modifier from the class/methods).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/metrics/utils/MetricsPredicateProvider.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.metrics.utils;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobType;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.Set;
+import java.util.function.Predicate;
+
+/** Helper class which provides relevant predicates for state metrics. */
+public class MetricsPredicateProvider {

Review Comment:
   This class doesn't scale well into the future; it will quickly become unreadable should more states be added.
   
   Consider creating some static predicates, with each only caring about a single state. Then you also don't have to worry about incorrect target states.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] atris commented on pull request #20187: [FLINK-27473] Introduce Generic Metrics For Job States

Posted by GitBox <gi...@apache.org>.
atris commented on PR #20187:
URL: https://github.com/apache/flink/pull/20187#issuecomment-1221583085

   @zentol Ping. Please let me know your thoughts and comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] atris commented on a diff in pull request #20187: [FLINK-27473] Introduce Generic Metrics For Job States

Posted by GitBox <gi...@apache.org>.
atris commented on code in PR #20187:
URL: https://github.com/apache/flink/pull/20187#discussion_r916597606


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -279,14 +279,13 @@ public AdaptiveScheduler(
         final MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings =
                 MetricOptions.JobStatusMetricsSettings.fromConfiguration(configuration);
 
-        deploymentTimeMetrics =
-                new DeploymentStateTimeMetrics(jobGraph.getJobType(), jobStatusMetricsSettings);
+        metricsWrapper = new MetricsWrapper(jobGraph.getJobType(), jobStatusMetricsSettings);

Review Comment:
   Thanks for fixing it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] atris commented on a diff in pull request #20187: [FLINK-27473] Introduce Generic Metrics For Job States

Posted by GitBox <gi...@apache.org>.
atris commented on code in PR #20187:
URL: https://github.com/apache/flink/pull/20187#discussion_r916570822


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/metrics/SubStateTimeMetricsTest.java:
##########
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.metrics;
+
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
+import static org.apache.flink.runtime.scheduler.metrics.StateTimeMetricTest.enable;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for SubStateTime metrics. */
+public class SubStateTimeMetricsTest {
+    private static final MetricOptions.JobStatusMetricsSettings settings =
+            enable(
+                    MetricOptions.JobStatusMetrics.STATE,
+                    MetricOptions.JobStatusMetrics.CURRENT_TIME,
+                    MetricOptions.JobStatusMetrics.TOTAL_TIME);
+
+    @Test
+    void testInitialValues() {
+        final ManualClock clock = new ManualClock(Duration.ofMillis(5).toNanos());
+
+        final SubStateTimeMetrics initializationStateTimeMetrics =
+                new SubStateTimeMetrics(
+                        JobType.BATCH, settings, ExecutionState.DEPLOYING, clock, "deploying");
+
+        assertThat(initializationStateTimeMetrics.getCurrentTime()).isEqualTo(0L);
+        assertThat(initializationStateTimeMetrics.getTotalTime()).isEqualTo(0L);
+        assertThat(initializationStateTimeMetrics.getBinary()).isEqualTo(0L);
+    }
+
+    @Test
+    void testInitializingOnFirstInitialization() {
+        final SubStateTimeMetrics metrics =
+                new SubStateTimeMetrics(
+                        JobType.BATCH, settings, ExecutionState.INITIALIZING, "initialising");
+
+        final ExecutionAttemptID id1 = createExecutionAttemptId();
+
+        metrics.onStateUpdate(id1, ExecutionState.CREATED, ExecutionState.SCHEDULED);
+        assertThat(metrics.getBinary()).isEqualTo(0L);
+
+        metrics.onStateUpdate(id1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
+        assertThat(metrics.getBinary()).isEqualTo(0L);
+
+        metrics.onStateUpdate(id1, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
+        assertThat(metrics.getBinary()).isEqualTo(1L);
+    }
+
+    @Test
+    void testInitializingStart_batch_notTriggeredIfOneDeploymentIsRunning() {
+        final SubStateTimeMetrics metrics =
+                new SubStateTimeMetrics(
+                        JobType.BATCH, settings, ExecutionState.INITIALIZING, "initialising");
+
+        final ExecutionAttemptID id1 = createExecutionAttemptId();
+
+        metrics.onStateUpdate(id1, ExecutionState.CREATED, ExecutionState.SCHEDULED);
+        metrics.onStateUpdate(id1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
+        metrics.onStateUpdate(id1, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
+        metrics.onStateUpdate(id1, ExecutionState.INITIALIZING, ExecutionState.RUNNING);
+
+        final ExecutionAttemptID id2 = createExecutionAttemptId();
+
+        metrics.onStateUpdate(id2, ExecutionState.CREATED, ExecutionState.SCHEDULED);
+        metrics.onStateUpdate(id2, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
+        metrics.onStateUpdate(id2, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
+
+        assertThat(metrics.getBinary()).isEqualTo(0L);
+    }
+
+    @Test
+    void testOneInitializingAndOneDeploying_batch() {
+        final SubStateTimeMetrics metrics =
+                new SubStateTimeMetrics(
+                        JobType.BATCH, settings, ExecutionState.INITIALIZING, "initialising");
+
+        final ExecutionAttemptID id1 = createExecutionAttemptId();
+        final ExecutionAttemptID id2 = createExecutionAttemptId();
+
+        metrics.onStateUpdate(id1, ExecutionState.CREATED, ExecutionState.SCHEDULED);
+        metrics.onStateUpdate(id2, ExecutionState.CREATED, ExecutionState.SCHEDULED);
+
+        metrics.onStateUpdate(id1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
+        metrics.onStateUpdate(id2, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
+
+        metrics.onStateUpdate(id1, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
+        assertThat(metrics.getBinary()).isEqualTo(1L);
+    }
+
+    @Test
+    void testOneInitializingAndOneDeploying_streaming() {
+        final SubStateTimeMetrics metrics =
+                new SubStateTimeMetrics(
+                        JobType.STREAMING, settings, ExecutionState.INITIALIZING, "initialising");
+
+        final ExecutionAttemptID id1 = createExecutionAttemptId();
+        final ExecutionAttemptID id2 = createExecutionAttemptId();
+
+        metrics.onStateUpdate(id1, ExecutionState.CREATED, ExecutionState.SCHEDULED);
+        metrics.onStateUpdate(id2, ExecutionState.CREATED, ExecutionState.SCHEDULED);
+
+        metrics.onStateUpdate(id1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
+        metrics.onStateUpdate(id2, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
+
+        metrics.onStateUpdate(id1, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
+        assertThat(metrics.getBinary()).isEqualTo(0L);
+
+        metrics.onStateUpdate(id2, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
+        assertThat(metrics.getBinary()).isEqualTo(1L);
+    }
+
+    @Test
+    void testInitialzingEnd_streaming_ignoresTerminalDeployments() {

Review Comment:
   I think the test is the only place where we test specific transitions for each supported metric -- and this is the only place that will hold the specifics for future metrics that we add.
   
   The reason as to why this test is valuable is because it allows us to check metric level correctness, and also exercises the entire chain (SubStateTimeMetrics, MetricsProvider).
   
   The duplication is because the state transitions and expected values for the given metrics are different.
   
   Unless you have a strong objection to this, I would rather retain this test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] atris commented on pull request #20187: [FLINK-27473] Introduce Generic Metrics For Job States

Posted by "atris (via GitHub)" <gi...@apache.org>.
atris commented on PR #20187:
URL: https://github.com/apache/flink/pull/20187#issuecomment-1648437676

   @zentol Somehow this PR fell off my radar. I have updated it now with your commit -- can we please proceed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20187: [FLINK-27473] Introduce Generic Metrics For Job States

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20187:
URL: https://github.com/apache/flink/pull/20187#issuecomment-1176536725

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "94ebb9c61ad7729a860ef60f811c62eed28fe10e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "94ebb9c61ad7729a860ef60f811c62eed28fe10e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 94ebb9c61ad7729a860ef60f811c62eed28fe10e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] atris commented on pull request #20187: [FLINK-27473] Introduce Generic Metrics For Job States

Posted by GitBox <gi...@apache.org>.
atris commented on PR #20187:
URL: https://github.com/apache/flink/pull/20187#issuecomment-1178744011

   @zentol Thank you for reviewing the changes. I have raised an iteration -- please see and let me know your thoughts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org