You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/20 10:27:47 UTC

[GitHub] [flink] zhuzhurk opened a new pull request, #20321: [FLINK-28138] Add metrics for speculative execution

zhuzhurk opened a new pull request, #20321:
URL: https://github.com/apache/flink/pull/20321

   ## What is the purpose of the change
   
   This PR adds two metrics to expose job problems and show the effectiveness of speculative execution:
    - numSlowExecutionVertices: Number of slow execution vertices at the moment.
    - numEffectiveSpeculativeExecutions: Number of speculative executions which finish before their corresponding original executions finish.
   
   ## Verifying this change
   
     - *Added unit tests in SpeculativeSchedulerTest*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20321: [FLINK-28138] Add metrics for speculative execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20321:
URL: https://github.com/apache/flink/pull/20321#discussion_r926576282


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##########
@@ -128,6 +128,12 @@
     /** The unique ID marking the specific execution instant of the task. */
     private final ExecutionAttemptID attemptId;
 
+    /**
+     * This field indicates whether the execution is the original execution of an execution vertex,
+     * i.e. it is created along with the creation of resetting of the execution vertex.
+     */
+    private final boolean original;

Review Comment:
   Good idea!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20321: [FLINK-28138] Add metrics for speculative execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20321:
URL: https://github.com/apache/flink/pull/20321#discussion_r926555455


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##########
@@ -85,6 +91,10 @@ public class SpeculativeScheduler extends AdaptiveBatchScheduler
 
     private final SlowTaskDetector slowTaskDetector;
 
+    private long numSlowExecutionVertices;

Review Comment:
   > I think the numXXX generally represents an accumulated value
   
   Maybe not because we also have `numRunningJobs`, `numRegisteredTaskManagers`, etc. 
   I do not have much preference here, but maybe we can stick to the proposed name in the FLIP?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20321: [FLINK-28138] Add metrics for speculative execution

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20321:
URL: https://github.com/apache/flink/pull/20321#issuecomment-1190111998

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d0a68832a6f80ce6379db64381a5f95ca99badcb",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d0a68832a6f80ce6379db64381a5f95ca99badcb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d0a68832a6f80ce6379db64381a5f95ca99badcb UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on a diff in pull request #20321: [FLINK-28138] Add metrics for speculative execution

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #20321:
URL: https://github.com/apache/flink/pull/20321#discussion_r926376468


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##########
@@ -149,6 +159,16 @@ public SpeculativeScheduler(
         this.blocklistOperations = checkNotNull(blocklistOperations);
 
         this.slowTaskDetector = new ExecutionTimeBasedSlowTaskDetector(jobMasterConfiguration);
+
+        this.numEffectiveSpeculativeExecutionsCounter = new SimpleCounter();
+        this.registerMetrics(jobManagerJobMetricGroup);

Review Comment:
   I think it would be better to move the registration to `startSchedulingInternal`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##########
@@ -128,6 +128,12 @@
     /** The unique ID marking the specific execution instant of the task. */
     private final ExecutionAttemptID attemptId;
 
+    /**
+     * This field indicates whether the execution is the original execution of an execution vertex,
+     * i.e. it is created along with the creation of resetting of the execution vertex.
+     */
+    private final boolean original;

Review Comment:
   Maybe we can add a `originalAttemptNumber` in `SpeculativeExecutionVertex` to record the original execution? And then we don't need to change the  logic in `Execution` and `ExecutionVertex`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##########
@@ -85,6 +91,10 @@ public class SpeculativeScheduler extends AdaptiveBatchScheduler
 
     private final SlowTaskDetector slowTaskDetector;
 
+    private long numSlowExecutionVertices;

Review Comment:
   How about change the name to `currentNumSlowExecutionVertices`? I think the `numXXX` generally represents an accumulated value (like `numEffectiveSpeculativeExecutions`).



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java:
##########
@@ -385,6 +385,60 @@ public void testSpeculativeExecutionCombinedWithAdaptiveScheduling() throws Exce
         assertThat(sinkExecutionVertex.getCurrentExecutions()).hasSize(2);
     }
 
+    @Test
+    public void testNumSlowExecutionVerticesMetric() {

Review Comment:
   public is not needed



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java:
##########
@@ -385,6 +385,60 @@ public void testSpeculativeExecutionCombinedWithAdaptiveScheduling() throws Exce
         assertThat(sinkExecutionVertex.getCurrentExecutions()).hasSize(2);
     }
 
+    @Test
+    public void testNumSlowExecutionVerticesMetric() {

Review Comment:
    `testSpeculativeExecutionCombinedWithAdaptiveScheduling` has the same problem



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java:
##########
@@ -385,6 +385,60 @@ public void testSpeculativeExecutionCombinedWithAdaptiveScheduling() throws Exce
         assertThat(sinkExecutionVertex.getCurrentExecutions()).hasSize(2);
     }
 
+    @Test
+    public void testNumSlowExecutionVerticesMetric() {
+        final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+        assertThat(scheduler.getNumSlowExecutionVertices()).isEqualTo(1);
+
+        // notify a slow vertex twice
+        notifySlowTask(scheduler, attempt1);
+        assertThat(scheduler.getNumSlowExecutionVertices()).isEqualTo(1);
+
+        // vertex no longer slow
+        scheduler.notifySlowTasks(Collections.emptyMap());
+        assertThat(scheduler.getNumSlowExecutionVertices()).isZero();
+    }
+
+    @Test
+    public void testEffectiveSpeculativeExecutionsMetric() {

Review Comment:
   public is not needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20321: [FLINK-28138] Add metrics for speculative execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20321:
URL: https://github.com/apache/flink/pull/20321#discussion_r926555455


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##########
@@ -85,6 +91,10 @@ public class SpeculativeScheduler extends AdaptiveBatchScheduler
 
     private final SlowTaskDetector slowTaskDetector;
 
+    private long numSlowExecutionVertices;

Review Comment:
   > I think the numXXX generally represents an accumulated value
   
   Maybe not because we also have `numRunningJobs`, `numRegisteredTaskManagers`, etc. 
   I do not have much preference here, then maybe we can stick to the proposed name in the FLIP?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##########
@@ -85,6 +91,10 @@ public class SpeculativeScheduler extends AdaptiveBatchScheduler
 
     private final SlowTaskDetector slowTaskDetector;
 
+    private long numSlowExecutionVertices;

Review Comment:
   > I think the numXXX generally represents an accumulated value
   
   Maybe not because we also have `numRunningJobs`, `numRegisteredTaskManagers`, etc. 
   I do not have much preference here, then maybe we can stick to the name as it is in the FLIP?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk closed pull request #20321: [FLINK-28138] Add metrics for speculative execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk closed pull request #20321: [FLINK-28138] Add metrics for speculative execution
URL: https://github.com/apache/flink/pull/20321


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org