You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/21 08:20:01 UTC

[GitHub] [flink] wanglijie95 commented on a diff in pull request #20321: [FLINK-28138] Add metrics for speculative execution

wanglijie95 commented on code in PR #20321:
URL: https://github.com/apache/flink/pull/20321#discussion_r926376468


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##########
@@ -149,6 +159,16 @@ public SpeculativeScheduler(
         this.blocklistOperations = checkNotNull(blocklistOperations);
 
         this.slowTaskDetector = new ExecutionTimeBasedSlowTaskDetector(jobMasterConfiguration);
+
+        this.numEffectiveSpeculativeExecutionsCounter = new SimpleCounter();
+        this.registerMetrics(jobManagerJobMetricGroup);

Review Comment:
   I think it would be better to move the registration to `startSchedulingInternal`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##########
@@ -128,6 +128,12 @@
     /** The unique ID marking the specific execution instant of the task. */
     private final ExecutionAttemptID attemptId;
 
+    /**
+     * This field indicates whether the execution is the original execution of an execution vertex,
+     * i.e. it is created along with the creation of resetting of the execution vertex.
+     */
+    private final boolean original;

Review Comment:
   Maybe we can add a `originalAttemptNumber` in `SpeculativeExecutionVertex` to record the original execution? And then we don't need to change the  logic in `Execution` and `ExecutionVertex`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##########
@@ -85,6 +91,10 @@ public class SpeculativeScheduler extends AdaptiveBatchScheduler
 
     private final SlowTaskDetector slowTaskDetector;
 
+    private long numSlowExecutionVertices;

Review Comment:
   How about change the name to `currentNumSlowExecutionVertices`? I think the `numXXX` generally represents an accumulated value (like `numEffectiveSpeculativeExecutions`).



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java:
##########
@@ -385,6 +385,60 @@ public void testSpeculativeExecutionCombinedWithAdaptiveScheduling() throws Exce
         assertThat(sinkExecutionVertex.getCurrentExecutions()).hasSize(2);
     }
 
+    @Test
+    public void testNumSlowExecutionVerticesMetric() {

Review Comment:
   public is not needed



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java:
##########
@@ -385,6 +385,60 @@ public void testSpeculativeExecutionCombinedWithAdaptiveScheduling() throws Exce
         assertThat(sinkExecutionVertex.getCurrentExecutions()).hasSize(2);
     }
 
+    @Test
+    public void testNumSlowExecutionVerticesMetric() {

Review Comment:
    `testSpeculativeExecutionCombinedWithAdaptiveScheduling` has the same problem



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java:
##########
@@ -385,6 +385,60 @@ public void testSpeculativeExecutionCombinedWithAdaptiveScheduling() throws Exce
         assertThat(sinkExecutionVertex.getCurrentExecutions()).hasSize(2);
     }
 
+    @Test
+    public void testNumSlowExecutionVerticesMetric() {
+        final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+        assertThat(scheduler.getNumSlowExecutionVertices()).isEqualTo(1);
+
+        // notify a slow vertex twice
+        notifySlowTask(scheduler, attempt1);
+        assertThat(scheduler.getNumSlowExecutionVertices()).isEqualTo(1);
+
+        // vertex no longer slow
+        scheduler.notifySlowTasks(Collections.emptyMap());
+        assertThat(scheduler.getNumSlowExecutionVertices()).isZero();
+    }
+
+    @Test
+    public void testEffectiveSpeculativeExecutionsMetric() {

Review Comment:
   public is not needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org