You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/13 09:02:03 UTC

[GitHub] [flink] atris opened a new pull request, #19943: FLINK-27473: Capture Time That Job Spends In Initialization

atris opened a new pull request, #19943:
URL: https://github.com/apache/flink/pull/19943

   This commit introduces a new metric to track the time that a job spends in initializing. Existing deployment state time metrics are modified to identify this new state.
   
   ExecutionGraphFactory API is expanded a bit to allow future state time metrics to be added a bit more easily.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19943: [FLINK-27473] Capture Time That Job Spends In Initialization

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19943:
URL: https://github.com/apache/flink/pull/19943#discussion_r897654533


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/metrics/DeploymentStateTimeMetrics.java:
##########
@@ -75,12 +78,25 @@ public DeploymentStateTimeMetrics(
         this.clock = clock;
 
         if (semantic == JobType.BATCH) {
-            deploymentStartPredicate = completedDeployments -> completedDeployments == 0;
-            deploymentEndPredicate = completedDeployments -> completedDeployments > 0;
+            // If the job type is batch:
+            // Deployment is started iff there is atleast one task in deploying state and no task
+            // is either
+            // initializing or running.
+            // Deployment is completed iff any task is in either initializing or deploying state
+            deploymentStartPredicate =
+                    deploymentPair ->
+                            deploymentPair.getLeft() == 0 && deploymentPair.getRight() == 0;

Review Comment:
   I don't think we need the Pair here. We don't really differentiate between initializing/deploying; in all cases the sum of them would work just fine.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/metrics/InitializationStateTimeMetrics.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.scheduler.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/**
+ * Metrics that capture how long a job took in initialization
+ *
+ * <p>These metrics differentiate between batch & streaming use-cases:
+ *
+ * <p>Batch: Measures from the start of the first initialization until the first task is running.
+ * From that point the job is making progress.
+ *
+ * <p>Streaming: Measures from the start of the first initialization until all tasks are running.
+ * From that point on checkpoints can be triggered, and thus progress be made.
+ */
+public class InitializationStateTimeMetrics
+        implements ExecutionStateUpdateListener, StateTimeMetric, MetricsRegistrar {
+    private static final long NOT_STARTED = -1L;
+
+    private final Predicate<Pair<Integer, Integer>> initializationStartPredicate;
+    private final Predicate<Integer> initializationEndPredicate;
+    private final MetricOptions.JobStatusMetricsSettings stateTimeMetricsSettings;
+    private final Clock clock;
+
+    // book-keeping
+    private final Set<ExecutionAttemptID> expectedDeployments = new HashSet<>();
+    private int pendingDeployments = 0;
+    private int initializingDeployments = 0;
+    private int completedDeployments = 0;
+
+    // metrics state
+    private long initializationStart = NOT_STARTED;
+    private long initializationTimeTotal = 0L;
+
+    public InitializationStateTimeMetrics(
+            JobType semantic, MetricOptions.JobStatusMetricsSettings stateTimeMetricsSettings) {
+        this(semantic, stateTimeMetricsSettings, SystemClock.getInstance());
+    }
+
+    @VisibleForTesting
+    InitializationStateTimeMetrics(
+            JobType semantic,
+            MetricOptions.JobStatusMetricsSettings stateTimeMetricsSettings,
+            Clock clock) {
+        this.stateTimeMetricsSettings = stateTimeMetricsSettings;
+        this.clock = clock;
+
+        if (semantic == JobType.BATCH) {
+            // For batch, if there is no task running and atleast one task is initializing, the
+            // job is initializing
+            initializationStartPredicate = deploymentPair -> deploymentPair.getLeft() == 0;
+            initializationEndPredicate = completedDeployments -> completedDeployments > 0;
+        } else {
+            // For streaming, if there is no task which is deploying and atleast one task is
+            // initializing, the job is initializing
+            initializationStartPredicate = deploymentPair -> deploymentPair.getRight() == 0;
+            initializationEndPredicate =
+                    completedDeployments -> completedDeployments == expectedDeployments.size();
+        }
+    }
+
+    @Override
+    public long getCurrentTime() {
+        return initializationStart == NOT_STARTED
+                ? 0L
+                : Math.max(0, clock.absoluteTimeMillis() - initializationStart);
+    }
+
+    @Override
+    public long getTotalTime() {
+        return getCurrentTime() + initializationTimeTotal;
+    }
+
+    @Override
+    public long getBinary() {
+        return initializationStart == NOT_STARTED ? 0L : 1L;
+    }
+
+    @Override
+    public void registerMetrics(MetricGroup metricGroup) {
+        StateTimeMetric.register(stateTimeMetricsSettings, metricGroup, this, "initializing");
+    }

Review Comment:
   We could think about sharing some code here. All this is identical to the deploymentStateTimeMetrics.
   The state handling can be generalized to be based on a triplet (previous states, target state, next states).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/metrics/InitializationStateTimeMetrics.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.scheduler.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/**
+ * Metrics that capture how long a job took in initialization
+ *
+ * <p>These metrics differentiate between batch & streaming use-cases:
+ *
+ * <p>Batch: Measures from the start of the first initialization until the first task is running.
+ * From that point the job is making progress.
+ *
+ * <p>Streaming: Measures from the start of the first initialization until all tasks are running.
+ * From that point on checkpoints can be triggered, and thus progress be made.
+ */
+public class InitializationStateTimeMetrics
+        implements ExecutionStateUpdateListener, StateTimeMetric, MetricsRegistrar {
+    private static final long NOT_STARTED = -1L;
+
+    private final Predicate<Pair<Integer, Integer>> initializationStartPredicate;
+    private final Predicate<Integer> initializationEndPredicate;
+    private final MetricOptions.JobStatusMetricsSettings stateTimeMetricsSettings;
+    private final Clock clock;
+
+    // book-keeping
+    private final Set<ExecutionAttemptID> expectedDeployments = new HashSet<>();
+    private int pendingDeployments = 0;
+    private int initializingDeployments = 0;
+    private int completedDeployments = 0;
+
+    // metrics state
+    private long initializationStart = NOT_STARTED;
+    private long initializationTimeTotal = 0L;
+
+    public InitializationStateTimeMetrics(
+            JobType semantic, MetricOptions.JobStatusMetricsSettings stateTimeMetricsSettings) {
+        this(semantic, stateTimeMetricsSettings, SystemClock.getInstance());
+    }
+
+    @VisibleForTesting
+    InitializationStateTimeMetrics(
+            JobType semantic,
+            MetricOptions.JobStatusMetricsSettings stateTimeMetricsSettings,
+            Clock clock) {
+        this.stateTimeMetricsSettings = stateTimeMetricsSettings;
+        this.clock = clock;
+
+        if (semantic == JobType.BATCH) {
+            // For batch, if there is no task running and atleast one task is initializing, the
+            // job is initializing
+            initializationStartPredicate = deploymentPair -> deploymentPair.getLeft() == 0;

Review Comment:
   there should also be no running task.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] atris closed pull request #19943: [FLINK-27473] Capture Time That Job Spends In Initialization

Posted by GitBox <gi...@apache.org>.
atris closed pull request #19943: [FLINK-27473] Capture Time That Job Spends In Initialization 
URL: https://github.com/apache/flink/pull/19943


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19943: FLINK-27473: Capture Time That Job Spends In Initialization

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19943:
URL: https://github.com/apache/flink/pull/19943#issuecomment-1153665915

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7191606d428fe2e5aaf0734e3328e6f993ce477b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7191606d428fe2e5aaf0734e3328e6f993ce477b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7191606d428fe2e5aaf0734e3328e6f993ce477b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] atris commented on pull request #19943: [FLINK-27473] Capture Time That Job Spends In Initialization

Posted by GitBox <gi...@apache.org>.
atris commented on PR #19943:
URL: https://github.com/apache/flink/pull/19943#issuecomment-1176533661

   Superseded by #20187 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19943: [FLINK-27473] Capture Time That Job Spends In Initialization

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19943:
URL: https://github.com/apache/flink/pull/19943#discussion_r897663259


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/metrics/DeploymentStateTimeMetrics.java:
##########
@@ -75,12 +78,25 @@ public DeploymentStateTimeMetrics(
         this.clock = clock;
 
         if (semantic == JobType.BATCH) {
-            deploymentStartPredicate = completedDeployments -> completedDeployments == 0;
-            deploymentEndPredicate = completedDeployments -> completedDeployments > 0;
+            // If the job type is batch:
+            // Deployment is started iff there is atleast one task in deploying state and no task
+            // is either
+            // initializing or running.
+            // Deployment is completed iff any task is in either initializing or deploying state
+            deploymentStartPredicate =
+                    deploymentPair ->
+                            deploymentPair.getLeft() == 0 && deploymentPair.getRight() == 0;

Review Comment:
   Note that this comment is irrelevant if we generalize things a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] atris commented on pull request #19943: [FLINK-27473] Capture Time That Job Spends In Initialization

Posted by GitBox <gi...@apache.org>.
atris commented on PR #19943:
URL: https://github.com/apache/flink/pull/19943#issuecomment-1156153163

   @zentol Thanks for taking a look.
   
   Could you elaborate a bit on what you have in mind in terms of generalizing? Are you thinking of a single state time metric that handles all states (deploying, initializing and running)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org