You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/18 10:06:27 UTC

[GitHub] [flink] pltbkd opened a new pull request, #20296: [FLINK-28588] Enhance REST API for Speculative Execution

pltbkd opened a new pull request, #20296:
URL: https://github.com/apache/flink/pull/20296

   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   As a follow-up step of FLIP-168 and FLIP-224, the Flink Web UI needs to be enhanced to display the related information if the speculative execution mechanism is enabled. This PR is introducing the changes of REST APIs.
   
   Note: Commits except the last one is for Blocklist Mechanism, and will be removed once its related pull requests are merged.
   
   ## Brief change log
   
     - A "other-concurrent-attempts" field is added to SubtaskExecutionAttemptDetailsInfo and SubtaskBackPressureInfo, to hold the information of other concurrent attempts other than the fastest one.
     - Count of blocked task managers and slots is added to the ResourceOverview.
     - Blocked flag is added to the TaskManagerInfo, indicating if the task manager is blocked.
     - MetricStore is modified to hold the metrics of more than one current execution attempt.
   
   ## Verifying this change
   
   This change added tests and can be verified by running all unit tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? The feature is documented in another pr.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932883301


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java:
##########
@@ -182,30 +188,32 @@ private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(
                 allFinished &= state.isTerminal();
                 endTime = Math.max(endTime, vertex.getStateTimestamp(state));
 
-                counts.addIOMetrics(
-                        vertex.getCurrentExecutionAttempt(),
-                        metricFetcher,
-                        jobID.toString(),
-                        jobVertex.getJobVertexId().toString());
-                MutableIOMetrics current = new MutableIOMetrics();
-                current.addIOMetrics(
-                        vertex.getCurrentExecutionAttempt(),
-                        metricFetcher,
-                        jobID.toString(),
-                        jobVertex.getJobVertexId().toString());
-                ioMetricsInfos.add(
-                        new IOMetricsInfo(
-                                current.getNumBytesIn(),
-                                current.isNumBytesInComplete(),
-                                current.getNumBytesOut(),
-                                current.isNumBytesOutComplete(),
-                                current.getNumRecordsIn(),
-                                current.isNumRecordsInComplete(),
-                                current.getNumRecordsOut(),
-                                current.isNumRecordsOutComplete(),
-                                current.getAccumulateBackPressuredTime(),
-                                current.getAccumulateIdleTime(),
-                                current.getAccumulateBusyTime()));
+                for (AccessExecution attempt : vertex.getCurrentExecutions()) {

Review Comment:
   I think it can help to display all the attempts on that TM. And then we should also include them into the aggregated IO metrics of the TM. Regarding the status shown on that tab, I think it can be an aggregation of all the **representative** attempts on that task manger, so that it can be FINISHED if the job successfully finishes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932851211


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) {
+        Map<Integer, ComponentMetricStore> subtaskMetricStores =
+                taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size());
         for (Map.Entry<Integer, ComponentMetricStore> entry : subtaskMetricStores.entrySet()) {
             int subtaskIndex = entry.getKey();
-            ComponentMetricStore subtaskMetricStore = entry.getValue();
-            double backPressureRatio = getBackPressureRatio(subtaskMetricStore);
-            double idleRatio = getIdleRatio(subtaskMetricStore);
-            double busyRatio = getBusyRatio(subtaskMetricStore);
-            result.add(
-                    new SubtaskBackPressureInfo(
-                            subtaskIndex,
-                            getBackPressureLevel(backPressureRatio),
-                            backPressureRatio,
-                            idleRatio,
-                            busyRatio));
+            SubtaskMetricStore subtaskMetricStore =
+                    taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+            Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+                    subtaskMetricStore.getAllAttemptsMetricStores();
+            if (allAttemptsMetricStores.isEmpty() || allAttemptsMetricStores.size() == 1) {
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex, null, subtaskMetricStore, null));
+            } else {
+                int currentAttempt =
+                        currentExecutionAttempts == null
+                                ? -1
+                                : currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
+                if (!allAttemptsMetricStores.containsKey(currentAttempt)) {

Review Comment:
   I don't think so. The issue only exists while fetching the metrics of a subtask or a task, that the acquired metrics may be not from the latest representative attempt. The archived execution graph contains which attempt is the representative one, and the metrics are acquired with attempt number. So neither the state nor the metrics for each attempt will be wrong. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932010754


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java:
##########
@@ -131,18 +132,23 @@ private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(
         Map<String, String> taskManagerId2Host = new HashMap<>();
         Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
         for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
-            TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
-            String taskManagerHost =
-                    location == null
-                            ? "(unassigned)"
-                            : location.getHostname() + ':' + location.dataPort();
-            String taskmanagerId =
-                    location == null ? "(unassigned)" : location.getResourceID().toString();
-            taskManagerId2Host.put(taskmanagerId, taskManagerHost);
-            List<AccessExecutionVertex> vertices =
-                    taskManagerVertices.computeIfAbsent(
-                            taskmanagerId, ignored -> new ArrayList<>(4));
-            vertices.add(vertex);
+            List<TaskManagerLocation> locations =
+                    vertex.getCurrentExecutions().stream()
+                            .map(AccessExecution::getAssignedResourceLocation)
+                            .collect(Collectors.toList());
+            for (TaskManagerLocation location : locations) {
+                String taskManagerHost =
+                        location == null
+                                ? "(unassigned)"
+                                : location.getHostname() + ':' + location.dataPort();
+                String taskmanagerId =
+                        location == null ? "(unassigned)" : location.getResourceID().toString();
+                taskManagerId2Host.put(taskmanagerId, taskManagerHost);
+                List<AccessExecutionVertex> vertices =
+                        taskManagerVertices.computeIfAbsent(
+                                taskmanagerId, ignored -> new ArrayList<>(4));

Review Comment:
   Better to avoid such a magic-number initial capacity (although it was added previously).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java:
##########
@@ -257,18 +258,19 @@ private Map<TaskManagerLocation, ImmutableSet<ExecutionAttemptID>> groupExecutio
                         executionVertex.getExecutionState());
                 continue;
             }
-            TaskManagerLocation tmLocation = executionVertex.getCurrentAssignedResourceLocation();
-            if (tmLocation == null) {
-                LOG.trace("ExecutionVertex {} is currently not assigned", executionVertex);
-                continue;
-            }
-            Set<ExecutionAttemptID> groupedAttemptIds =
-                    executionAttemptsByLocation.getOrDefault(tmLocation, new HashSet<>());
+            for (AccessExecution execution : executionVertex.getCurrentExecutions()) {

Review Comment:
   To double confirm, would any problem occur if one subtask has multiple attempts?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java:
##########
@@ -111,36 +111,44 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
         List<ArchivedJson> archive = new ArrayList<>(16);
         for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
             for (AccessExecutionVertex subtask : task.getTaskVertices()) {
-                ResponseBody curAttemptJson =
-                        SubtaskExecutionAttemptDetailsInfo.create(
-                                subtask.getCurrentExecutionAttempt(),
-                                null,
-                                graph.getJobID(),
-                                task.getJobVertexId());
-                String curAttemptPath =
-                        getMessageHeaders()
-                                .getTargetRestEndpointURL()
-                                .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
-                                .replace(
-                                        ':' + JobVertexIdPathParameter.KEY,
-                                        task.getJobVertexId().toString())
-                                .replace(
-                                        ':' + SubtaskIndexPathParameter.KEY,
-                                        String.valueOf(subtask.getParallelSubtaskIndex()))
-                                .replace(
-                                        ':' + SubtaskAttemptPathParameter.KEY,
-                                        String.valueOf(
-                                                subtask.getCurrentExecutionAttempt()
-                                                        .getAttemptNumber()));
-
-                archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+                for (AccessExecution attempt : subtask.getCurrentExecutions()) {
+                    if (attempt != null) {

Review Comment:
   In which case attempt can be null?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) {
+        Map<Integer, ComponentMetricStore> subtaskMetricStores =
+                taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size());
         for (Map.Entry<Integer, ComponentMetricStore> entry : subtaskMetricStores.entrySet()) {
             int subtaskIndex = entry.getKey();
-            ComponentMetricStore subtaskMetricStore = entry.getValue();
-            double backPressureRatio = getBackPressureRatio(subtaskMetricStore);
-            double idleRatio = getIdleRatio(subtaskMetricStore);
-            double busyRatio = getBusyRatio(subtaskMetricStore);
-            result.add(
-                    new SubtaskBackPressureInfo(
-                            subtaskIndex,
-                            getBackPressureLevel(backPressureRatio),
-                            backPressureRatio,
-                            idleRatio,
-                            busyRatio));
+            SubtaskMetricStore subtaskMetricStore =
+                    taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+            Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+                    subtaskMetricStore.getAllAttemptsMetricStores();
+            if (allAttemptsMetricStores.isEmpty() || allAttemptsMetricStores.size() == 1) {
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex, null, subtaskMetricStore, null));
+            } else {
+                int currentAttempt =
+                        currentExecutionAttempts == null

Review Comment:
   In which case currentExecutionAttempts can be null?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) {
+        Map<Integer, ComponentMetricStore> subtaskMetricStores =
+                taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size());
         for (Map.Entry<Integer, ComponentMetricStore> entry : subtaskMetricStores.entrySet()) {
             int subtaskIndex = entry.getKey();
-            ComponentMetricStore subtaskMetricStore = entry.getValue();
-            double backPressureRatio = getBackPressureRatio(subtaskMetricStore);
-            double idleRatio = getIdleRatio(subtaskMetricStore);
-            double busyRatio = getBusyRatio(subtaskMetricStore);
-            result.add(
-                    new SubtaskBackPressureInfo(
-                            subtaskIndex,
-                            getBackPressureLevel(backPressureRatio),
-                            backPressureRatio,
-                            idleRatio,
-                            busyRatio));
+            SubtaskMetricStore subtaskMetricStore =
+                    taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+            Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+                    subtaskMetricStore.getAllAttemptsMetricStores();
+            if (allAttemptsMetricStores.isEmpty() || allAttemptsMetricStores.size() == 1) {
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex, null, subtaskMetricStore, null));
+            } else {
+                int currentAttempt =
+                        currentExecutionAttempts == null
+                                ? -1
+                                : currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
+                if (!allAttemptsMetricStores.containsKey(currentAttempt)) {
+                    // allAttemptsMetricStores is not empty here
+                    currentAttempt = allAttemptsMetricStores.keySet().iterator().next();
+                }
+                List<SubtaskBackPressureInfo> otherConcurrentAttempts =
+                        new ArrayList<>(allAttemptsMetricStores.size() - 1);
+                for (Map.Entry<Integer, ComponentMetricStore> attemptStore :
+                        allAttemptsMetricStores.entrySet()) {
+                    if (attemptStore.getKey() == currentAttempt) {
+                        continue;
+                    }
+                    otherConcurrentAttempts.add(
+                            createSubtaskAttemptBackpressureInfo(
+                                    subtaskIndex,
+                                    attemptStore.getKey(),
+                                    attemptStore.getValue(),
+                                    null));
+                }
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex,
+                                currentAttempt,
+                                allAttemptsMetricStores.get(currentAttempt),
+                                otherConcurrentAttempts));
+            }
         }
         result.sort(Comparator.comparingInt(SubtaskBackPressureInfo::getSubtask));
         return result;
     }
 
+    private SubtaskBackPressureInfo createSubtaskAttemptBackpressureInfo(
+            int subtaskIndex,
+            Integer attemptNumber,

Review Comment:
   Better to annotate it and otherConcurrentAttempts as Nullable.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java:
##########
@@ -100,6 +106,36 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
     @JsonProperty(FIELD_NAME_STATUS_DURATION)
     private final Map<ExecutionState, Long> statusDuration;
 
+    @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS)
+    @JsonInclude(Include.NON_EMPTY)
+    @Nullable
+    private final List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts;
+
+    public SubtaskExecutionAttemptDetailsInfo(

Review Comment:
   Is this for test only? If so, I would suggest to remove it and refactor the tests instead.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) {
+        Map<Integer, ComponentMetricStore> subtaskMetricStores =
+                taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size());
         for (Map.Entry<Integer, ComponentMetricStore> entry : subtaskMetricStores.entrySet()) {
             int subtaskIndex = entry.getKey();
-            ComponentMetricStore subtaskMetricStore = entry.getValue();
-            double backPressureRatio = getBackPressureRatio(subtaskMetricStore);
-            double idleRatio = getIdleRatio(subtaskMetricStore);
-            double busyRatio = getBusyRatio(subtaskMetricStore);
-            result.add(
-                    new SubtaskBackPressureInfo(
-                            subtaskIndex,
-                            getBackPressureLevel(backPressureRatio),
-                            backPressureRatio,
-                            idleRatio,
-                            busyRatio));
+            SubtaskMetricStore subtaskMetricStore =
+                    taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+            Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+                    subtaskMetricStore.getAllAttemptsMetricStores();
+            if (allAttemptsMetricStores.isEmpty() || allAttemptsMetricStores.size() == 1) {
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex, null, subtaskMetricStore, null));
+            } else {
+                int currentAttempt =
+                        currentExecutionAttempts == null
+                                ? -1
+                                : currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
+                if (!allAttemptsMetricStores.containsKey(currentAttempt)) {
+                    // allAttemptsMetricStores is not empty here
+                    currentAttempt = allAttemptsMetricStores.keySet().iterator().next();
+                }
+                List<SubtaskBackPressureInfo> otherConcurrentAttempts =
+                        new ArrayList<>(allAttemptsMetricStores.size() - 1);
+                for (Map.Entry<Integer, ComponentMetricStore> attemptStore :
+                        allAttemptsMetricStores.entrySet()) {
+                    if (attemptStore.getKey() == currentAttempt) {
+                        continue;
+                    }
+                    otherConcurrentAttempts.add(
+                            createSubtaskAttemptBackpressureInfo(
+                                    subtaskIndex,
+                                    attemptStore.getKey(),
+                                    attemptStore.getValue(),
+                                    null));
+                }
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex,
+                                currentAttempt,
+                                allAttemptsMetricStores.get(currentAttempt),
+                                otherConcurrentAttempts));
+            }
         }
         result.sort(Comparator.comparingInt(SubtaskBackPressureInfo::getSubtask));

Review Comment:
   Is this a stable sort?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -391,4 +476,36 @@ private static TaskMetricStore unmodifiable(TaskMetricStore source) {
                     unmodifiableMap(source.metrics), unmodifiableMap(source.subtasks));
         }
     }
+
+    /** Sub-structure containing metrics of a single subtask. */
+    @ThreadSafe
+    public static class SubtaskMetricStore extends ComponentMetricStore {
+        private final Map<Integer, ComponentMetricStore> attempts;
+
+        private SubtaskMetricStore() {
+            this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>());
+        }
+
+        private SubtaskMetricStore(
+                Map<String, String> metrics, Map<Integer, ComponentMetricStore> attempts) {
+            super(metrics);
+            this.attempts = checkNotNull(attempts);
+        }
+
+        private static SubtaskMetricStore unmodifiable(SubtaskMetricStore source) {
+            if (source == null) {
+                return null;
+            }
+            return new SubtaskMetricStore(
+                    unmodifiableMap(source.metrics), unmodifiableMap(source.attempts));
+        }
+
+        public ComponentMetricStore getAttemptsMetricStore(int attemptId) {
+            return attempts.get(attemptId);
+        }
+
+        public Map<Integer, ComponentMetricStore> getAllAttemptsMetricStores() {
+            return attempts;

Review Comment:
   better to make it unmodifiable.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java:
##########
@@ -99,25 +99,27 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
         List<ArchivedJson> archive = new ArrayList<>(16);
         for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
             for (AccessExecutionVertex subtask : task.getTaskVertices()) {
-                ResponseBody curAttemptJson =
-                        createAccumulatorInfo(subtask.getCurrentExecutionAttempt());
-                String curAttemptPath =
-                        getMessageHeaders()
-                                .getTargetRestEndpointURL()
-                                .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
-                                .replace(
-                                        ':' + JobVertexIdPathParameter.KEY,
-                                        task.getJobVertexId().toString())
-                                .replace(
-                                        ':' + SubtaskIndexPathParameter.KEY,
-                                        String.valueOf(subtask.getParallelSubtaskIndex()))
-                                .replace(
-                                        ':' + SubtaskAttemptPathParameter.KEY,
-                                        String.valueOf(
-                                                subtask.getCurrentExecutionAttempt()
-                                                        .getAttemptNumber()));
-
-                archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+                for (AccessExecution attempt : subtask.getCurrentExecutions()) {
+                    if (attempt != null) {

Review Comment:
   In which case `attempt` can be null?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) {
+        Map<Integer, ComponentMetricStore> subtaskMetricStores =
+                taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size());
         for (Map.Entry<Integer, ComponentMetricStore> entry : subtaskMetricStores.entrySet()) {
             int subtaskIndex = entry.getKey();
-            ComponentMetricStore subtaskMetricStore = entry.getValue();
-            double backPressureRatio = getBackPressureRatio(subtaskMetricStore);
-            double idleRatio = getIdleRatio(subtaskMetricStore);
-            double busyRatio = getBusyRatio(subtaskMetricStore);
-            result.add(
-                    new SubtaskBackPressureInfo(
-                            subtaskIndex,
-                            getBackPressureLevel(backPressureRatio),
-                            backPressureRatio,
-                            idleRatio,
-                            busyRatio));
+            SubtaskMetricStore subtaskMetricStore =
+                    taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+            Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+                    subtaskMetricStore.getAllAttemptsMetricStores();
+            if (allAttemptsMetricStores.isEmpty() || allAttemptsMetricStores.size() == 1) {
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex, null, subtaskMetricStore, null));
+            } else {
+                int currentAttempt =
+                        currentExecutionAttempts == null
+                                ? -1
+                                : currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
+                if (!allAttemptsMetricStores.containsKey(currentAttempt)) {

Review Comment:
   Is there any case that `currentAttempt != -1` but it is not contained in `allAttemptsMetricStores`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java:
##########
@@ -182,30 +188,32 @@ private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(
                 allFinished &= state.isTerminal();
                 endTime = Math.max(endTime, vertex.getStateTimestamp(state));
 
-                counts.addIOMetrics(
-                        vertex.getCurrentExecutionAttempt(),
-                        metricFetcher,
-                        jobID.toString(),
-                        jobVertex.getJobVertexId().toString());
-                MutableIOMetrics current = new MutableIOMetrics();
-                current.addIOMetrics(
-                        vertex.getCurrentExecutionAttempt(),
-                        metricFetcher,
-                        jobID.toString(),
-                        jobVertex.getJobVertexId().toString());
-                ioMetricsInfos.add(
-                        new IOMetricsInfo(
-                                current.getNumBytesIn(),
-                                current.isNumBytesInComplete(),
-                                current.getNumBytesOut(),
-                                current.isNumBytesOutComplete(),
-                                current.getNumRecordsIn(),
-                                current.isNumRecordsInComplete(),
-                                current.getNumRecordsOut(),
-                                current.isNumRecordsOutComplete(),
-                                current.getAccumulateBackPressuredTime(),
-                                current.getAccumulateIdleTime(),
-                                current.getAccumulateBusyTime()));
+                for (AccessExecution attempt : vertex.getCurrentExecutions()) {

Review Comment:
   Looks to me this is not right because the attempts of an execution vertex may locate in different task managers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r929778622


##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java:
##########
@@ -31,6 +31,6 @@ protected Class<ClusterOverviewWithVersion> getTestResponseClass() {
 
     @Override
     protected ClusterOverviewWithVersion getTestResponseInstance() {
-        return new ClusterOverviewWithVersion(1, 3, 3, 7, 4, 2, 0, "version", "commit");
+        return new ClusterOverviewWithVersion(2, 6, 3, 1, 3, 7, 4, 2, 0, "version", "commit");

Review Comment:
   I think we have to modify because the blocked tms and available tms are expected to be not 0 for the testing, so the totalTaskManager has to be more than 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r929780673


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java:
##########
@@ -64,6 +66,7 @@ public TaskManagerDetailsInfo(
             @JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo freeResource,
             @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription,
             @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration,
+            @JsonProperty(FIELD_NAME_BLOCKED) @Nullable Boolean blocked,

Review Comment:
   It's the same case as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932838778


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) {
+        Map<Integer, ComponentMetricStore> subtaskMetricStores =
+                taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size());
         for (Map.Entry<Integer, ComponentMetricStore> entry : subtaskMetricStores.entrySet()) {
             int subtaskIndex = entry.getKey();
-            ComponentMetricStore subtaskMetricStore = entry.getValue();
-            double backPressureRatio = getBackPressureRatio(subtaskMetricStore);
-            double idleRatio = getIdleRatio(subtaskMetricStore);
-            double busyRatio = getBusyRatio(subtaskMetricStore);
-            result.add(
-                    new SubtaskBackPressureInfo(
-                            subtaskIndex,
-                            getBackPressureLevel(backPressureRatio),
-                            backPressureRatio,
-                            idleRatio,
-                            busyRatio));
+            SubtaskMetricStore subtaskMetricStore =
+                    taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+            Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+                    subtaskMetricStore.getAllAttemptsMetricStores();
+            if (allAttemptsMetricStores.isEmpty() || allAttemptsMetricStores.size() == 1) {
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex, null, subtaskMetricStore, null));
+            } else {
+                int currentAttempt =
+                        currentExecutionAttempts == null
+                                ? -1
+                                : currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
+                if (!allAttemptsMetricStores.containsKey(currentAttempt)) {

Review Comment:
   Does this mean it can happen that the displayed representative attempt in the WebUI is not the true representative attempt? e.g. A CANCELED attempt is displayed as representative attempt while there is another RUNNING/FINISHED attempt.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r926285695


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java:
##########
@@ -28,14 +28,18 @@ public class ResourceOverview implements Serializable {
     private static final long serialVersionUID = 7618746920569224557L;
 
     private static final ResourceOverview EMPTY_RESOURCE_OVERVIEW =
-            new ResourceOverview(0, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO);
+            new ResourceOverview(0, 0, 0, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO);
 
     private final int numberTaskManagers;
 
     private final int numberRegisteredSlots;
 
     private final int numberFreeSlots;
 
+    private final int numberBlockedTaskManagaers;

Review Comment:
   numberBlockedTaskManagaers -> numberBlockedTaskManagers



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java:
##########
@@ -65,6 +73,14 @@ public int getNumberFreeSlots() {
         return numberFreeSlots;
     }
 
+    public int getNumberBlockedTaskManagaers() {

Review Comment:
   getNumberBlockedTaskManagaers -> getNumberBlockedTaskManagers



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -40,13 +43,25 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
 
     private final ArchivedExecution currentExecution; // this field must never be null
 
+    private final Collection<AccessExecution> currentExecutions;
+
     // ------------------------------------------------------------------------
 
     public ArchivedExecutionVertex(ExecutionVertex vertex) {
         this.subTaskIndex = vertex.getParallelSubtaskIndex();
         this.executionHistory = getCopyOfExecutionHistory(vertex);
         this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
-        this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
+
+        Execution vertexCurrentExecution = vertex.getCurrentExecutionAttempt();
+        assert vertex.getCurrentExecutions().contains(vertexCurrentExecution);
+        currentExecutions = new ArrayList<>(vertex.getCurrentExecutions().size());
+        currentExecution = vertexCurrentExecution.archive();
+        currentExecutions.add(currentExecution);
+        for (Execution execution : vertex.getCurrentExecutions()) {
+            if (execution != vertexCurrentExecution) {
+                currentExecutions.add(execution.archive());
+            }
+        }
     }
 
     public ArchivedExecutionVertex(

Review Comment:
   Let's annotate it as `VisibleForTesting` because now it is only used for testing purposes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java:
##########
@@ -46,6 +47,8 @@ public interface AccessExecutionVertex {
      */
     AccessExecution getCurrentExecutionAttempt();
 
+    <T extends AccessExecution> Collection<T> getCurrentExecutions();

Review Comment:
   A java doc should be added for it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java:
##########
@@ -64,6 +66,7 @@ public TaskManagerDetailsInfo(
             @JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo freeResource,
             @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription,
             @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration,
+            @JsonProperty(FIELD_NAME_BLOCKED) @Nullable Boolean blocked,

Review Comment:
   Maybe it can just be a `boolean`? See the above comment.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java:
##########
@@ -103,4 +105,33 @@ public void testJobDetailsCompatibleUnmarshalling() throws IOException {
 
         assertEquals(expected, unmarshalled);
     }
+
+    @Test
+    public void testJobDetailsWithExecutionAttemptsMarshalling() throws JsonProcessingException {
+        Map<String, Map<Integer, Integer>> currentExecutionAttempts = new HashMap<>();
+        currentExecutionAttempts.computeIfAbsent("a", k -> new HashMap<>()).put(1, 2);
+        currentExecutionAttempts.computeIfAbsent("a", k -> new HashMap<>()).put(2, 4);
+        currentExecutionAttempts.computeIfAbsent("b", k -> new HashMap<>()).put(3, 1);
+
+        final JobDetails expected =
+                new JobDetails(
+                        new JobID(),
+                        "foobar",
+                        1L,
+                        10L,
+                        9L,
+                        JobStatus.RUNNING,
+                        8L,
+                        new int[] {1, 3, 3, 4, 7, 4, 2, 7, 3, 3},
+                        42,
+                        currentExecutionAttempts);
+
+        final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+        final JsonNode marshalled = objectMapper.valueToTree(expected);
+
+        final JobDetails unmarshalled = objectMapper.treeToValue(marshalled, JobDetails.class);
+
+        assertEquals(expected, unmarshalled);

Review Comment:
   Flink is migrating to JUnit 5 so that all the new tests should use `JUnit5` and `assertJ`.
   You can migrate the existing tests to JUnit5 in a  hotfix commit before making functional changes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java:
##########
@@ -28,14 +28,18 @@ public class ResourceOverview implements Serializable {
     private static final long serialVersionUID = 7618746920569224557L;
 
     private static final ResourceOverview EMPTY_RESOURCE_OVERVIEW =
-            new ResourceOverview(0, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO);
+            new ResourceOverview(0, 0, 0, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO);
 
     private final int numberTaskManagers;
 
     private final int numberRegisteredSlots;
 
     private final int numberFreeSlots;
 
+    private final int numberBlockedTaskManagaers;
+
+    private final int numberBlockedSlots;

Review Comment:
   I think `numberBlockedFreeSlots` would be more accurate and easier to understand.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java:
##########
@@ -31,6 +31,6 @@ protected Class<ClusterOverviewWithVersion> getTestResponseClass() {
 
     @Override
     protected ClusterOverviewWithVersion getTestResponseInstance() {
-        return new ClusterOverviewWithVersion(1, 3, 3, 7, 4, 2, 0, "version", "commit");
+        return new ClusterOverviewWithVersion(2, 6, 3, 1, 3, 7, 4, 2, 0, "version", "commit");

Review Comment:
   It's better to keep the other params as is and just add numbers of the newly introduced params.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java:
##########
@@ -142,22 +142,37 @@ public static class TaskQueryScopeInfo extends QueryScopeInfo {
         public final String jobID;
         public final String vertexID;
         public final int subtaskIndex;
+        public final int attemptNum;
 
         public TaskQueryScopeInfo(String jobID, String vertexid, int subtaskIndex) {

Review Comment:
   Looks to me there are not many occurrences of the invocations of the constructors. So I prefer to change the original constructors and fix the tests, instead of adding new ones. 
   
   Besides that, the default attemptNumber should be 0 instead of -1.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -671,7 +674,8 @@ public CompletableFuture<TaskManagerInfoWithSlots> requestTaskManagerDetailsInfo
                                     slotManager.getRegisteredResourceOf(instanceId),
                                     slotManager.getFreeResourceOf(instanceId),
                                     taskExecutor.getHardwareDescription(),
-                                    taskExecutor.getMemoryConfiguration()),
+                                    taskExecutor.getMemoryConfiguration(),
+                                    blocked),

Review Comment:
   nit: can invoke the `isBlockedTaskManager` here, just like others. This helps the code to look better.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -630,6 +630,7 @@ public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Tim
             final ResourceID resourceId = taskExecutorEntry.getKey();
             final WorkerRegistration<WorkerType> taskExecutor = taskExecutorEntry.getValue();
 
+            boolean blocked = blocklistHandler.isBlockedTaskManager(taskExecutor.getResourceID());

Review Comment:
   It's better to make local variables `final`, if possible.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -79,6 +95,11 @@ public ArchivedExecution getCurrentExecutionAttempt() {
         return currentExecution;
     }
 
+    @Override
+    public Collection<AccessExecution> getCurrentExecutions() {
+        return currentExecutions;

Review Comment:
   ```suggestion
           return Collections.unmodifiableCollection(currentExecutions);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java:
##########
@@ -142,22 +142,37 @@ public static class TaskQueryScopeInfo extends QueryScopeInfo {
         public final String jobID;
         public final String vertexID;
         public final int subtaskIndex;
+        public final int attemptNum;

Review Comment:
   I would suggest to name it as `attemptNumber`, which is commonly used in a lot of places already.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java:
##########
@@ -125,6 +136,7 @@ public TaskManagerInfo(
         this.freeResource = freeResource;
         this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
         this.memoryConfiguration = Preconditions.checkNotNull(memoryConfiguration);
+        this.blocked = blocked != null && blocked;

Review Comment:
   I prefer to add a bracket here to clearly show the order of the result computation.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java:
##########
@@ -495,6 +496,10 @@ protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initiali
         final SubtaskMetricsHandler subtaskMetricsHandler =
                 new SubtaskMetricsHandler(leaderRetriever, timeout, responseHeaders, metricFetcher);
 
+        final SubtaskAttemptMetricsHandler subtaskAttemptMetricsHandler =

Review Comment:
   I think this REST API is not in the scope of this FLIP. Therefore, I think we should remove it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -40,13 +43,25 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
 
     private final ArchivedExecution currentExecution; // this field must never be null
 
+    private final Collection<AccessExecution> currentExecutions;
+
     // ------------------------------------------------------------------------
 
     public ArchivedExecutionVertex(ExecutionVertex vertex) {
         this.subTaskIndex = vertex.getParallelSubtaskIndex();
         this.executionHistory = getCopyOfExecutionHistory(vertex);
         this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
-        this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
+
+        Execution vertexCurrentExecution = vertex.getCurrentExecutionAttempt();
+        assert vertex.getCurrentExecutions().contains(vertexCurrentExecution);

Review Comment:
   Better to use `Preconditions.checkState`.
   
   And it's better to check it in a more performant way, like:
   ```
   checkState(vertexCurrentExecution == vertex.getCurrentExecution(vertexCurrentExecution.getAttemptNumber));
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java:
##########
@@ -113,7 +123,8 @@ public TaskManagerInfo(
             @JsonProperty(FIELD_NAME_TOTAL_RESOURCE) ResourceProfileInfo totalResource,
             @JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo freeResource,
             @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription,
-            @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration) {
+            @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration,
+            @JsonProperty(FIELD_NAME_BLOCKED) @Nullable Boolean blocked) {

Review Comment:
   Why declaring `blocked` to be a `@Nullable Boolean` if we finally will turn it into a `boolean` and exclude it if it is false?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java:
##########
@@ -174,23 +189,44 @@ public static class OperatorQueryScopeInfo extends QueryScopeInfo {
         public final String jobID;
         public final String vertexID;
         public final int subtaskIndex;
+        public final int attemptNum;
         public final String operatorName;
 
         public OperatorQueryScopeInfo(

Review Comment:
   Looks to me there are not many occurrences of the invocations of the constructors. So I prefer to change the original constructors and fix the tests, instead of adding new ones. 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java:
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for the {@link SpeculativeExecutionVertex}. */
+class ArchivedExecutionVertexWithSpeculativeExecutionTest {
+
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private TestingInternalFailuresListener internalFailuresListener;
+
+    @BeforeEach
+    void setUp() {
+        internalFailuresListener = new TestingInternalFailuresListener();
+    }
+
+    @Test
+    void testCreateSpeculativeExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        assertThat(ev.getCurrentExecutions()).hasSize(2);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testResetExecutionVertex() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        e2.cancel();
+        ev.resetForNewExecution();
+
+        assertThat(
+                        ev.getExecutionHistory()
+                                .getHistoricalExecution(0)
+                                .orElseThrow(NullPointerException::new)
+                                .getAttemptId())
+                .isEqualTo(e1.getAttemptId());
+        assertThat(
+                        ev.getExecutionHistory()
+                                .getHistoricalExecution(1)
+                                .orElseThrow(NullPointerException::new)
+                                .getAttemptId())
+                .isEqualTo(e2.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.getCurrentExecutionAttempt().getAttemptNumber()).isEqualTo(2);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testCancel() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.cancel();
+        assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+        assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testSuspend() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.suspend();
+        assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+        assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testFail() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.fail(new Exception("Forced test failure."));
+        assertThat(internalFailuresListener.getFailedTasks())
+                .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testMarkFailed() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.markFailed(new Exception("Forced test failure."));
+        assertThat(internalFailuresListener.getFailedTasks())
+                .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testVertexTerminationAndJobTermination() throws Exception {
+        final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+        final ExecutionGraph eg = createExecutionGraph(jobGraph);
+        eg.transitionToRunning();
+
+        ExecutionJobVertex jv = eg.getJobVertex(jobVertex.getID());
+        assert jv != null;
+        final SpeculativeExecutionVertex ev = (SpeculativeExecutionVertex) jv.getTaskVertices()[0];
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        final CompletableFuture<?> terminationFuture = ev.getTerminationFuture();
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        assertThat(terminationFuture.isDone()).isFalse();
+        assertThat(eg.getState()).isSameAs(JobStatus.RUNNING);
+
+        e2.cancel();
+        assertThat(terminationFuture.isDone()).isTrue();
+        assertThat(eg.getState()).isSameAs(JobStatus.FINISHED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveFailedExecutions() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.RUNNING);
+
+        final Execution e2 = ev.createNewSpeculativeExecution(0);
+        e2.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e2.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.currentExecution).isSameAs(e1);
+
+        final Execution e3 = ev.createNewSpeculativeExecution(0);
+        e3.transitionState(ExecutionState.RUNNING);
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.currentExecution).isSameAs(e3);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveTheOnlyCurrentExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testGetExecutionState() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.CANCELED);
+
+        // the latter added state is more likely to reach FINISH state
+        final List<ExecutionState> statesSortedByPriority = new ArrayList<>();
+        statesSortedByPriority.add(ExecutionState.FAILED);
+        statesSortedByPriority.add(ExecutionState.CANCELING);
+        statesSortedByPriority.add(ExecutionState.CREATED);
+        statesSortedByPriority.add(ExecutionState.SCHEDULED);
+        statesSortedByPriority.add(ExecutionState.DEPLOYING);
+        statesSortedByPriority.add(ExecutionState.INITIALIZING);
+        statesSortedByPriority.add(ExecutionState.RUNNING);
+        statesSortedByPriority.add(ExecutionState.FINISHED);
+
+        for (ExecutionState state : statesSortedByPriority) {
+            final Execution execution = ev.createNewSpeculativeExecution(0);
+            execution.transitionState(state);
+        }
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception {
+        final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        ExecutionJobVertex jv = executionGraph.getJobVertex(jobVertex.getID());
+        assert jv != null;
+        return (SpeculativeExecutionVertex) jv.getTaskVertices()[0];
+    }
+
+    private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
+        final ExecutionGraph executionGraph =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .setExecutionJobVertexFactory(new SpeculativeExecutionJobVertex.Factory())
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        executionGraph.setInternalTaskFailuresListener(internalFailuresListener);
+        executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+        return executionGraph;
+    }
+
+    private static void compareExecutionVertex(
+            AccessExecutionVertex runtimeVertex, AccessExecutionVertex archivedVertex) {
+        assertEquals(

Review Comment:
   JUnit assertions should be avoided. See https://flink.apache.org/contributing/code-style-and-quality-common.html#testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932469341


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) {
+        Map<Integer, ComponentMetricStore> subtaskMetricStores =
+                taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size());
         for (Map.Entry<Integer, ComponentMetricStore> entry : subtaskMetricStores.entrySet()) {
             int subtaskIndex = entry.getKey();
-            ComponentMetricStore subtaskMetricStore = entry.getValue();
-            double backPressureRatio = getBackPressureRatio(subtaskMetricStore);
-            double idleRatio = getIdleRatio(subtaskMetricStore);
-            double busyRatio = getBusyRatio(subtaskMetricStore);
-            result.add(
-                    new SubtaskBackPressureInfo(
-                            subtaskIndex,
-                            getBackPressureLevel(backPressureRatio),
-                            backPressureRatio,
-                            idleRatio,
-                            busyRatio));
+            SubtaskMetricStore subtaskMetricStore =
+                    taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+            Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+                    subtaskMetricStore.getAllAttemptsMetricStores();
+            if (allAttemptsMetricStores.isEmpty() || allAttemptsMetricStores.size() == 1) {
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex, null, subtaskMetricStore, null));
+            } else {
+                int currentAttempt =
+                        currentExecutionAttempts == null
+                                ? -1
+                                : currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
+                if (!allAttemptsMetricStores.containsKey(currentAttempt)) {

Review Comment:
   Since JobDetails and MetricStore are acquired independently, maybe the attempt exists when acquiring JobDetails while it's already failed when acquiring the MetricStore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932890270


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java:
##########
@@ -120,9 +120,24 @@ private static JobVertexDetailsInfo createJobVertexDetailsInfo(
         for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
             final AccessExecution execution = vertex.getCurrentExecutionAttempt();
             final JobVertexID jobVertexID = jobVertex.getJobVertexId();
+
+            final Collection<AccessExecution> attempts = vertex.getCurrentExecutions();
+            List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts = null;
+
+            if (attempts.size() > 1) {
+                otherConcurrentAttempts = new ArrayList<>();
+                for (AccessExecution attempt : attempts) {
+                    if (attempt.getAttemptNumber() != execution.getAttemptNumber()) {
+                        otherConcurrentAttempts.add(
+                                SubtaskExecutionAttemptDetailsInfo.create(
+                                        attempt, metricFetcher, jobID, jobVertexID, null));
+                    }
+                }
+            }
+
             subtasks.add(
                     SubtaskExecutionAttemptDetailsInfo.create(
-                            execution, metricFetcher, jobID, jobVertexID));
+                            execution, metricFetcher, jobID, jobVertexID, otherConcurrentAttempts));

Review Comment:
   I'd prefer not to change the existing code that declaring the variable, as long as the naming is not really confusing here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r929628681


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java:
##########
@@ -70,11 +76,20 @@ public ClusterOverviewWithVersion(
             int numTaskManagersConnected,
             int numSlotsTotal,
             int numSlotsAvailable,
+            int numTaskManagersBlocked,
+            int numSlotsBlocked,
             JobsOverview jobs1,
             JobsOverview jobs2,
             String version,
             String commitId) {
-        super(numTaskManagersConnected, numSlotsTotal, numSlotsAvailable, jobs1, jobs2);

Review Comment:
   Seems this method is no longer used and can be removed?
   Then we can also remove the ClusterOverview constructor it relies on, because that constructor is only used here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java:
##########
@@ -44,11 +51,21 @@ public class ClusterOverview extends JobsOverview {
     @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE)
     private final int numSlotsAvailable;
 
+    @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final int numTaskManagersBlocked;
+
+    @JsonProperty(FIELD_NAME_SLOTS_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final int numSlotsBlocked;
+
     @JsonCreator
     public ClusterOverview(
             @JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
             @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
             @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+            @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) @Nullable Integer numTaskManagersBlocked,
+            @JsonProperty(FIELD_NAME_SLOTS_BLOCKED) @Nullable Integer numSlotsBlocked,

Review Comment:
   numSlotsBlocked -> numSlotsFreeAndBlocked



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java:
##########
@@ -19,21 +19,28 @@
 package org.apache.flink.runtime.messages.webmonitor;
 
 import org.apache.flink.runtime.resourcemanager.ResourceOverview;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import javax.annotation.Nullable;
+
 /**
  * Response to the {@link RequestStatusOverview} message, carrying a description of the Flink
  * cluster status.
  */
-public class ClusterOverview extends JobsOverview {
+public class ClusterOverview extends JobsOverview implements ResponseBody {

Review Comment:
   maybe no need to implements ResponseBody?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java:
##########
@@ -100,6 +129,7 @@ public JobDetails(
                 ExecutionState.values().length);
         this.tasksPerState = checkNotNull(tasksPerState);
         this.numTasks = numTasks;
+        this.currentExecutionAttempts = currentExecutionAttempts;

Review Comment:
   ```suggestion
           this.currentExecutionAttempts = checkNotNull(currentExecutionAttempts);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java:
##########
@@ -76,6 +80,8 @@ public class JobDetails implements Serializable {
 
     private final int numTasks;
 
+    private final Map<String, Map<Integer, Integer>> currentExecutionAttempts;

Review Comment:
   better to add a comment to explain what do the keys and values stand for. Because it's hard to reasoning at the first glance.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java:
##########
@@ -76,6 +80,8 @@ public class JobDetails implements Serializable {
 
     private final int numTasks;
 
+    private final Map<String, Map<Integer, Integer>> currentExecutionAttempts;
+
     public JobDetails(

Review Comment:
   I guess this method now is only used by tests? If so, we should mark it as @VisibleForTesting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20296:
URL: https://github.com/apache/flink/pull/20296#issuecomment-1187017010

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10d04c413255556cf0c2114ae28ee514016c3d5e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "10d04c413255556cf0c2114ae28ee514016c3d5e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 10d04c413255556cf0c2114ae28ee514016c3d5e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932927829


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ArchivedExecutionGraphTestUtils {

Review Comment:
   This util class and its methods can be package private



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java:
##########
@@ -175,45 +180,48 @@ private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(
 
             MutableIOMetrics counts = new MutableIOMetrics();
 
-            for (AccessExecutionVertex vertex : taskVertices) {
-                final ExecutionState state = vertex.getExecutionState();
-                tasksPerState[state.ordinal()]++;
+            int representativeAttemptsCount = 0;
+            for (AccessExecution execution : executions) {
+                final ExecutionState state = execution.getState();
+                executionsPerState[state.ordinal()]++;
+                if (representativeExecutions.contains(execution)) {
+                    tasksPerState[state.ordinal()]++;
+                    representativeAttemptsCount += 1;

Review Comment:
   nit: representativeAttemptsCount++;



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link SpeculativeExecutionVertex}. */
+class ArchivedExecutionVertexWithSpeculativeExecutionTest {
+
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private TestingInternalFailuresListener internalFailuresListener;
+
+    @BeforeEach
+    void setUp() {
+        internalFailuresListener = new TestingInternalFailuresListener();
+    }
+
+    @Test
+    void testCreateSpeculativeExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        assertThat(ev.getCurrentExecutions()).hasSize(1);

Review Comment:
   This comment also applies to other tests in this class.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ArchivedExecutionGraphTestUtils {

Review Comment:
   The util class should have a private constructor to avoid being instantiated. A reference is SchedulerTestingUtils.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r931909030


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -391,4 +476,36 @@ private static TaskMetricStore unmodifiable(TaskMetricStore source) {
                     unmodifiableMap(source.metrics), unmodifiableMap(source.subtasks));
         }
     }
+
+    /** Sub-structure containing metrics of a single subtask. */
+    @ThreadSafe
+    public static class SubtaskMetricStore extends ComponentMetricStore {
+        private final Map<Integer, ComponentMetricStore> attempts;
+
+        private SubtaskMetricStore() {
+            this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>());
+        }
+
+        private SubtaskMetricStore(
+                Map<String, String> metrics, Map<Integer, ComponentMetricStore> attempts) {
+            super(metrics);
+            this.attempts = checkNotNull(attempts);
+        }
+
+        private static SubtaskMetricStore unmodifiable(SubtaskMetricStore source) {

Review Comment:
   This method is not used?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -214,15 +260,33 @@ public void add(MetricDump metric) {
                     task = job.tasks.computeIfAbsent(taskInfo.vertexID, k -> new TaskMetricStore());
                     subtask =
                             task.subtasks.computeIfAbsent(
-                                    taskInfo.subtaskIndex, k -> new ComponentMetricStore());
+                                    taskInfo.subtaskIndex, k -> new SubtaskMetricStore());
+
+                    if (taskInfo.attemptNumber >= 0) {
+                        // Consider as the current attempt if current attempt id is not set, which
+                        // means there should be only one execution
+                        isCurrentAttempt =
+                                Optional.of(currentExecutionAttempts)
+                                                .map(m -> m.get(taskInfo.jobID))
+                                                .map(m -> m.get(taskInfo.vertexID))
+                                                .map(m -> m.get(taskInfo.subtaskIndex))
+                                                .orElse(taskInfo.attemptNumber)
+                                        == taskInfo.attemptNumber;
+                        attempt =
+                                subtask.attempts.computeIfAbsent(
+                                        taskInfo.attemptNumber, k -> new ComponentMetricStore());
+                        addMetric(attempt.metrics, name, metric);
+                    }
                     /**
                      * The duplication is intended. Metrics scoped by subtask are useful for several
                      * job/task handlers, while the WebInterface task metric queries currently do
                      * not account for subtasks, so we don't divide by subtask and instead use the
                      * concatenation of subtask index and metric name as the name for those.
                      */
-                    addMetric(subtask.metrics, name, metric);
-                    addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric);
+                    if (isCurrentAttempt) {

Review Comment:
   Might move the above comments inside `if` ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) {
+        Map<Integer, ComponentMetricStore> subtaskMetricStores =
+                taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size());
         for (Map.Entry<Integer, ComponentMetricStore> entry : subtaskMetricStores.entrySet()) {
             int subtaskIndex = entry.getKey();
-            ComponentMetricStore subtaskMetricStore = entry.getValue();
-            double backPressureRatio = getBackPressureRatio(subtaskMetricStore);
-            double idleRatio = getIdleRatio(subtaskMetricStore);
-            double busyRatio = getBusyRatio(subtaskMetricStore);
-            result.add(
-                    new SubtaskBackPressureInfo(
-                            subtaskIndex,
-                            getBackPressureLevel(backPressureRatio),
-                            backPressureRatio,
-                            idleRatio,
-                            busyRatio));
+            SubtaskMetricStore subtaskMetricStore =
+                    taskMetricStore.getSubtaskMetricStore(subtaskIndex);

Review Comment:
   We could make  subtaskMetricStores to be `Map<Integer, SubtaskMetricStore>` and use `entry.getValue()` directly.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -233,21 +297,42 @@ public void add(MetricDump metric) {
                                     operatorInfo.vertexID, k -> new TaskMetricStore());
                     subtask =
                             task.subtasks.computeIfAbsent(
-                                    operatorInfo.subtaskIndex, k -> new ComponentMetricStore());
+                                    operatorInfo.subtaskIndex, k -> new SubtaskMetricStore());
+
+                    if (operatorInfo.attemptNumber >= 0) {

Review Comment:
   Could we extract a help method for the subtask and operator scopes ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java:
##########
@@ -127,22 +128,24 @@ private static JobExceptionsInfoWithHistory createJobExceptionsInfo(
         List<JobExceptionsInfo.ExecutionExceptionInfo> taskExceptionList = new ArrayList<>();
         boolean truncated = false;
         for (AccessExecutionVertex task : executionGraph.getAllExecutionVertices()) {
-            Optional<ErrorInfo> failure = task.getFailureInfo();
-            if (failure.isPresent()) {
-                if (taskExceptionList.size() >= exceptionToReportMaxSize) {
-                    truncated = true;
-                    break;
+            for (AccessExecution execution : task.getCurrentExecutions()) {
+                Optional<ErrorInfo> failure = execution.getFailureInfo();
+                if (failure.isPresent()) {
+                    if (taskExceptionList.size() >= exceptionToReportMaxSize) {
+                        truncated = true;
+                        break;
+                    }
+
+                    TaskManagerLocation location = execution.getAssignedResourceLocation();
+                    String locationString = toString(location);
+                    long timestamp = execution.getStateTimestamp(ExecutionState.FAILED);
+                    taskExceptionList.add(
+                            new JobExceptionsInfo.ExecutionExceptionInfo(
+                                    failure.get().getExceptionAsString(),
+                                    task.getTaskNameWithSubtaskIndex(),

Review Comment:
   It seems currently the page would not show the attempts information? Should we modify here to distinguish the errors of different attempts ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java:
##########
@@ -120,9 +120,24 @@ private static JobVertexDetailsInfo createJobVertexDetailsInfo(
         for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
             final AccessExecution execution = vertex.getCurrentExecutionAttempt();
             final JobVertexID jobVertexID = jobVertex.getJobVertexId();
+
+            final Collection<AccessExecution> attempts = vertex.getCurrentExecutions();
+            List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts = null;
+
+            if (attempts.size() > 1) {
+                otherConcurrentAttempts = new ArrayList<>();
+                for (AccessExecution attempt : attempts) {
+                    if (attempt.getAttemptNumber() != execution.getAttemptNumber()) {
+                        otherConcurrentAttempts.add(
+                                SubtaskExecutionAttemptDetailsInfo.create(
+                                        attempt, metricFetcher, jobID, jobVertexID, null));
+                    }
+                }
+            }
+
             subtasks.add(
                     SubtaskExecutionAttemptDetailsInfo.create(
-                            execution, metricFetcher, jobID, jobVertexID));
+                            execution, metricFetcher, jobID, jobVertexID, otherConcurrentAttempts));

Review Comment:
   Might rename `execution` to `currentAttempt` 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -214,15 +260,33 @@ public void add(MetricDump metric) {
                     task = job.tasks.computeIfAbsent(taskInfo.vertexID, k -> new TaskMetricStore());
                     subtask =
                             task.subtasks.computeIfAbsent(
-                                    taskInfo.subtaskIndex, k -> new ComponentMetricStore());
+                                    taskInfo.subtaskIndex, k -> new SubtaskMetricStore());
+
+                    if (taskInfo.attemptNumber >= 0) {
+                        // Consider as the current attempt if current attempt id is not set, which
+                        // means there should be only one execution
+                        isCurrentAttempt =
+                                Optional.of(currentExecutionAttempts)
+                                                .map(m -> m.get(taskInfo.jobID))
+                                                .map(m -> m.get(taskInfo.vertexID))
+                                                .map(m -> m.get(taskInfo.subtaskIndex))
+                                                .orElse(taskInfo.attemptNumber)
+                                        == taskInfo.attemptNumber;
+                        attempt =
+                                subtask.attempts.computeIfAbsent(

Review Comment:
   It seems a bit weird to break the encapsulation here to direct modify the attempts. But we might consider it separately. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932886544


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -391,4 +476,36 @@ private static TaskMetricStore unmodifiable(TaskMetricStore source) {
                     unmodifiableMap(source.metrics), unmodifiableMap(source.subtasks));
         }
     }
+
+    /** Sub-structure containing metrics of a single subtask. */
+    @ThreadSafe
+    public static class SubtaskMetricStore extends ComponentMetricStore {
+        private final Map<Integer, ComponentMetricStore> attempts;
+
+        private SubtaskMetricStore() {
+            this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>());
+        }
+
+        private SubtaskMetricStore(
+                Map<String, String> metrics, Map<Integer, ComponentMetricStore> attempts) {
+            super(metrics);
+            this.attempts = checkNotNull(attempts);
+        }
+
+        private static SubtaskMetricStore unmodifiable(SubtaskMetricStore source) {

Review Comment:
   It's a bug not using the method in getSubtaskMetricStore. Now it's fixed so the method is used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on PR #20296:
URL: https://github.com/apache/flink/pull/20296#issuecomment-1198920178

   Hi @zhuzhurk and @gaoyunhaii ,
   Thanks for the reviewing again!
   A new fixup commit has been pushed. It's resolving most of the comments. I'll separate and squash the fixes into original commits once the review is done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932351357


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -40,15 +44,27 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
 
     private final ArchivedExecution currentExecution; // this field must never be null
 
+    private final Collection<AccessExecution> currentExecutions;
+
     // ------------------------------------------------------------------------
 
     public ArchivedExecutionVertex(ExecutionVertex vertex) {
         this.subTaskIndex = vertex.getParallelSubtaskIndex();
         this.executionHistory = getCopyOfExecutionHistory(vertex);
         this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
-        this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
+
+        Execution vertexCurrentExecution = vertex.getCurrentExecutionAttempt();
+        currentExecutions = new ArrayList<>(vertex.getCurrentExecutions().size());
+        currentExecution = vertexCurrentExecution.archive();
+        currentExecutions.add(currentExecution);
+        for (Execution execution : vertex.getCurrentExecutions()) {

Review Comment:
   The representing one has already been added a few lines above. Since Execution.archive creates a new ArchivedExecution, to ensure the currentExecutions contains the currentExecution, it's added there and excluded here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932843818


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java:
##########
@@ -131,18 +132,23 @@ private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(
         Map<String, String> taskManagerId2Host = new HashMap<>();
         Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
         for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
-            TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
-            String taskManagerHost =
-                    location == null
-                            ? "(unassigned)"
-                            : location.getHostname() + ':' + location.dataPort();
-            String taskmanagerId =
-                    location == null ? "(unassigned)" : location.getResourceID().toString();
-            taskManagerId2Host.put(taskmanagerId, taskManagerHost);
-            List<AccessExecutionVertex> vertices =
-                    taskManagerVertices.computeIfAbsent(
-                            taskmanagerId, ignored -> new ArrayList<>(4));
-            vertices.add(vertex);
+            List<TaskManagerLocation> locations =
+                    vertex.getCurrentExecutions().stream()
+                            .map(AccessExecution::getAssignedResourceLocation)
+                            .collect(Collectors.toList());
+            for (TaskManagerLocation location : locations) {
+                String taskManagerHost =
+                        location == null
+                                ? "(unassigned)"
+                                : location.getHostname() + ':' + location.dataPort();
+                String taskmanagerId =
+                        location == null ? "(unassigned)" : location.getResourceID().toString();
+                taskManagerId2Host.put(taskmanagerId, taskManagerHost);
+                List<AccessExecutionVertex> vertices =
+                        taskManagerVertices.computeIfAbsent(
+                                taskmanagerId, ignored -> new ArrayList<>(4));

Review Comment:
   ok, I'll remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932889525


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java:
##########
@@ -261,6 +261,53 @@ public static SubtaskExecutionAttemptDetailsInfo create(
                 duration,
                 ioMetricsInfo,
                 taskmanagerId,
-                getExecutionStateDuration(execution));
+                getExecutionStateDuration(execution),
+                otherConcurrentAttempts);
+    }
+
+    public List<SubtaskExecutionAttemptDetailsInfo> getOtherConcurrentAttempts() {

Review Comment:
   Quite a few getters in this class are not called. I'm not sure if we should add the getter or not. I'll move this method next to other getters first. Please let me know if it should actually be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932426992


##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java:
##########
@@ -44,11 +50,22 @@ public class ClusterOverview extends JobsOverview {
     @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE)
     private final int numSlotsAvailable;
 
+    @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final int numTaskManagersBlocked;
+
+    @JsonProperty(FIELD_NAME_SLOTS_FREE_AND_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final int numSlotsFreeAndBlocked;
+
     @JsonCreator
     public ClusterOverview(
             @JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
             @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
             @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+            @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) @Nullable Integer numTaskManagersBlocked,

Review Comment:
   The constructor is used as the json creator by jackson. If the json doesn't contain the field, jackson will use null as the field value, which will cause a NPE. We may meet the case if HistoryServer archive the graph with an older version of flink while read it with the new version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on PR #20296:
URL: https://github.com/apache/flink/pull/20296#issuecomment-1195245339

   @zhuzhurk 
   Thanks a lot for the reviewing! 
   I have split the pr into more meaning commits, and most of your suggestions have been taken, except for the jackson one, which I have replied with the reason.
   Would you please take another look at the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r931837487


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java:
##########
@@ -227,7 +246,8 @@ public boolean equals(Object o) {
                 && Objects.equals(resourceId, that.resourceId)
                 && Objects.equals(address, that.address)
                 && Objects.equals(hardwareDescription, that.hardwareDescription)
-                && Objects.equals(memoryConfiguration, that.memoryConfiguration);
+                && Objects.equals(memoryConfiguration, that.memoryConfiguration)
+                && Objects.equals(blocked, that.blocked);

Review Comment:
   -> `&& blocked == that.blocked;`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -688,15 +688,31 @@ public CompletableFuture<TaskManagerInfoWithSlots> requestTaskManagerDetailsInfo
     @Override
     public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
         final int numberSlots = slotManager.getNumberRegisteredSlots();
-        final int numberFreeSlots = slotManager.getNumberFreeSlots();
         final ResourceProfile totalResource = slotManager.getRegisteredResource();
-        final ResourceProfile freeResource = slotManager.getFreeResource();
 
+        int numberFreeSlots = slotManager.getNumberFreeSlots();
+        ResourceProfile freeResource = slotManager.getFreeResource();
+
+        int blockedTaskManagers = 0;
+        int totalBlockedSlots = 0;

Review Comment:
   totalBlockedSlots -> totalBlockedFreeSlots



##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java:
##########
@@ -67,7 +67,7 @@ public class TaskManagerDetailsHandlerTest extends TestLogger {
 
     private TaskManagerDetailsHandler testInstance;
 
-    @Before
+    @BeforeEach

Review Comment:
   The change is better to be part of the commit to migrate tests to JUnit5.
   The usages of hamcrest should be refactored as well.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -688,15 +688,31 @@ public CompletableFuture<TaskManagerInfoWithSlots> requestTaskManagerDetailsInfo
     @Override
     public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
         final int numberSlots = slotManager.getNumberRegisteredSlots();
-        final int numberFreeSlots = slotManager.getNumberFreeSlots();
         final ResourceProfile totalResource = slotManager.getRegisteredResource();
-        final ResourceProfile freeResource = slotManager.getFreeResource();
 
+        int numberFreeSlots = slotManager.getNumberFreeSlots();
+        ResourceProfile freeResource = slotManager.getFreeResource();
+
+        int blockedTaskManagers = 0;
+        int totalBlockedSlots = 0;
+        for (WorkerRegistration<WorkerType> registration : taskExecutors.values()) {
+            if (blocklistHandler.isBlockedTaskManager(registration.getResourceID())) {
+                blockedTaskManagers++;
+                int blockedSlots = slotManager.getNumberFreeSlotsOf(registration.getInstanceID());

Review Comment:
   blockedSlots -> blockedFreeSlots



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link SpeculativeExecutionVertex}. */

Review Comment:
   maybe: Tests for the {@link ArchivedExecutionVertex} created from a {@link SpeculativeExecutionVertex}.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -156,6 +179,27 @@ public synchronized ComponentMetricStore getSubtaskMetricStore(
         return ComponentMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex));
     }
 
+    public synchronized ComponentMetricStore getSubtaskAttemptMetricStore(
+            String jobID, String taskID, int subtaskIndex, int attemptId) {
+        JobMetricStore job = jobID == null ? null : jobs.get(jobID);
+        if (job == null) {
+            return null;
+        }
+        TaskMetricStore task = job.getTaskMetricStore(taskID);
+        if (task == null) {
+            return null;
+        }
+        SubtaskMetricStore subtask = task.getSubtaskMetricStore(subtaskIndex);
+        if (attemptId < 0) {

Review Comment:
   A negative attempt number should never happen. Therefore a `checkArgument(attemptNumber >= 0)` is enough.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -214,15 +260,33 @@ public void add(MetricDump metric) {
                     task = job.tasks.computeIfAbsent(taskInfo.vertexID, k -> new TaskMetricStore());
                     subtask =
                             task.subtasks.computeIfAbsent(
-                                    taskInfo.subtaskIndex, k -> new ComponentMetricStore());
+                                    taskInfo.subtaskIndex, k -> new SubtaskMetricStore());
+
+                    if (taskInfo.attemptNumber >= 0) {
+                        // Consider as the current attempt if current attempt id is not set, which

Review Comment:
   This doc seems to be outdated



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java:
##########
@@ -670,6 +677,95 @@ void testNewlyAddedBlockedNodesWillBeSynchronizedToAllRegisteredJobMasters() thr
         assertThat(receivedBlockedNodes2).containsExactlyInAnyOrder(blockedNode1, blockedNode2);
     }
 
+    @Test
+    void testResourceOverviewWithBlockedSlots() throws Exception {
+        ManuallyTriggeredScheduledExecutor executor = new ManuallyTriggeredScheduledExecutor();
+        final SlotManager slotManager = DeclarativeSlotManagerBuilder.newBuilder(executor).build();
+        resourceManager =
+                new ResourceManagerBuilder()
+                        .withSlotManager(slotManager)
+                        .withBlocklistHandlerFactory(
+                                new DefaultBlocklistHandler.Factory(Duration.ofMillis(100L)))
+                        .buildAndStart();
+
+        final ResourceManagerGateway resourceManagerGateway =
+                resourceManager.getSelfGateway(ResourceManagerGateway.class);
+
+        ResourceID taskExecutor = ResourceID.generate();
+        ResourceID taskExecutorToBlock = ResourceID.generate();
+        registerTaskExecutorAndSlot(resourceManagerGateway, taskExecutor, 3);
+        registerTaskExecutorAndSlot(resourceManagerGateway, taskExecutorToBlock, 5);
+        executor.triggerAll();
+
+        ResourceOverview overview =
+                resourceManagerGateway.requestResourceOverview(Time.seconds(5)).get();
+        assertThat(overview.getNumberTaskManagers()).isEqualTo(2);
+        assertThat(overview.getNumberRegisteredSlots()).isEqualTo(8);
+        assertThat(overview.getNumberFreeSlots()).isEqualTo(8);
+        assertThat(overview.getNumberBlockedTaskManagers()).isEqualTo(0);
+        assertThat(overview.getNumberBlockedFreeSlots()).isEqualTo(0);
+        assertThat(overview.getTotalResource())
+                .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(8));
+        assertThat(overview.getFreeResource())
+                .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(8));
+
+        resourceManagerGateway.notifyNewBlockedNodes(
+                Collections.singleton(
+                        new BlockedNode(
+                                resourceManager.getNodeIdOfTaskManager(taskExecutorToBlock),
+                                "Test cause",
+                                System.currentTimeMillis())));

Review Comment:
   Better to be Long.Max_VALUE in case it gets unblocked in another thread and make the test unstable.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -214,15 +260,33 @@ public void add(MetricDump metric) {
                     task = job.tasks.computeIfAbsent(taskInfo.vertexID, k -> new TaskMetricStore());
                     subtask =
                             task.subtasks.computeIfAbsent(
-                                    taskInfo.subtaskIndex, k -> new ComponentMetricStore());
+                                    taskInfo.subtaskIndex, k -> new SubtaskMetricStore());
+
+                    if (taskInfo.attemptNumber >= 0) {
+                        // Consider as the current attempt if current attempt id is not set, which

Review Comment:
   id -> number



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -177,7 +221,9 @@ public void add(MetricDump metric) {
             TaskManagerMetricStore tm;
             JobMetricStore job;
             TaskMetricStore task;
-            ComponentMetricStore subtask;
+            SubtaskMetricStore subtask;
+            ComponentMetricStore attempt;
+            boolean isCurrentAttempt = true;

Review Comment:
   Seems this ` = true` is of no use.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link SpeculativeExecutionVertex}. */
+class ArchivedExecutionVertexWithSpeculativeExecutionTest {
+
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private TestingInternalFailuresListener internalFailuresListener;
+
+    @BeforeEach
+    void setUp() {
+        internalFailuresListener = new TestingInternalFailuresListener();
+    }
+
+    @Test
+    void testCreateSpeculativeExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        assertThat(ev.getCurrentExecutions()).hasSize(2);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testResetExecutionVertex() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        e2.cancel();
+        ev.resetForNewExecution();
+
+        assertThat(
+                        ev.getExecutionHistory()
+                                .getHistoricalExecution(0)
+                                .orElseThrow(NullPointerException::new)
+                                .getAttemptId())
+                .isEqualTo(e1.getAttemptId());
+        assertThat(
+                        ev.getExecutionHistory()
+                                .getHistoricalExecution(1)
+                                .orElseThrow(NullPointerException::new)
+                                .getAttemptId())
+                .isEqualTo(e2.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.getCurrentExecutionAttempt().getAttemptNumber()).isEqualTo(2);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testCancel() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.cancel();
+        assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+        assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testSuspend() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.suspend();
+        assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+        assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testFail() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.fail(new Exception("Forced test failure."));
+        assertThat(internalFailuresListener.getFailedTasks())
+                .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testMarkFailed() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.markFailed(new Exception("Forced test failure."));
+        assertThat(internalFailuresListener.getFailedTasks())
+                .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testVertexTerminationAndJobTermination() throws Exception {
+        final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+        final ExecutionGraph eg = createExecutionGraph(jobGraph);
+        eg.transitionToRunning();
+
+        ExecutionJobVertex jv = eg.getJobVertex(jobVertex.getID());
+        assert jv != null;
+        final SpeculativeExecutionVertex ev = (SpeculativeExecutionVertex) jv.getTaskVertices()[0];
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        final CompletableFuture<?> terminationFuture = ev.getTerminationFuture();
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        assertThat(terminationFuture.isDone()).isFalse();
+        assertThat(eg.getState()).isSameAs(JobStatus.RUNNING);
+
+        e2.cancel();
+        assertThat(terminationFuture.isDone()).isTrue();
+        assertThat(eg.getState()).isSameAs(JobStatus.FINISHED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveFailedExecutions() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.RUNNING);
+
+        final Execution e2 = ev.createNewSpeculativeExecution(0);
+        e2.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e2.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.currentExecution).isSameAs(e1);
+
+        final Execution e3 = ev.createNewSpeculativeExecution(0);
+        e3.transitionState(ExecutionState.RUNNING);
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.currentExecution).isSameAs(e3);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveTheOnlyCurrentExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testGetExecutionState() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.CANCELED);
+
+        // the latter added state is more likely to reach FINISH state
+        final List<ExecutionState> statesSortedByPriority = new ArrayList<>();
+        statesSortedByPriority.add(ExecutionState.FAILED);
+        statesSortedByPriority.add(ExecutionState.CANCELING);
+        statesSortedByPriority.add(ExecutionState.CREATED);
+        statesSortedByPriority.add(ExecutionState.SCHEDULED);
+        statesSortedByPriority.add(ExecutionState.DEPLOYING);
+        statesSortedByPriority.add(ExecutionState.INITIALIZING);
+        statesSortedByPriority.add(ExecutionState.RUNNING);
+        statesSortedByPriority.add(ExecutionState.FINISHED);
+
+        for (ExecutionState state : statesSortedByPriority) {
+            final Execution execution = ev.createNewSpeculativeExecution(0);
+            execution.transitionState(state);
+        }
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception {
+        final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        ExecutionJobVertex jv = executionGraph.getJobVertex(jobVertex.getID());
+        assert jv != null;

Review Comment:
   This `assert` should be avoided. `AssertJ` assertions should be used instead.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -156,6 +179,27 @@ public synchronized ComponentMetricStore getSubtaskMetricStore(
         return ComponentMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex));
     }
 
+    public synchronized ComponentMetricStore getSubtaskAttemptMetricStore(
+            String jobID, String taskID, int subtaskIndex, int attemptId) {

Review Comment:
   attemptId -> attemptNumber



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -214,15 +260,33 @@ public void add(MetricDump metric) {
                     task = job.tasks.computeIfAbsent(taskInfo.vertexID, k -> new TaskMetricStore());
                     subtask =
                             task.subtasks.computeIfAbsent(
-                                    taskInfo.subtaskIndex, k -> new ComponentMetricStore());
+                                    taskInfo.subtaskIndex, k -> new SubtaskMetricStore());
+
+                    if (taskInfo.attemptNumber >= 0) {

Review Comment:
   Seems this check is not needed?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link SpeculativeExecutionVertex}. */
+class ArchivedExecutionVertexWithSpeculativeExecutionTest {
+
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private TestingInternalFailuresListener internalFailuresListener;
+
+    @BeforeEach
+    void setUp() {
+        internalFailuresListener = new TestingInternalFailuresListener();
+    }
+
+    @Test
+    void testCreateSpeculativeExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        assertThat(ev.getCurrentExecutions()).hasSize(1);

Review Comment:
   I think the main verification of this test is the `ArchivedExecutionGraphTestUtils .compareExecutionVertex(...)`.
   The other assertions are already done in `SpeculativeExecutionVertexTest` and can be excluded from this test class.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link SpeculativeExecutionVertex}. */
+class ArchivedExecutionVertexWithSpeculativeExecutionTest {
+
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private TestingInternalFailuresListener internalFailuresListener;
+
+    @BeforeEach
+    void setUp() {
+        internalFailuresListener = new TestingInternalFailuresListener();
+    }
+
+    @Test
+    void testCreateSpeculativeExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        assertThat(ev.getCurrentExecutions()).hasSize(2);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testResetExecutionVertex() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        e2.cancel();
+        ev.resetForNewExecution();
+
+        assertThat(
+                        ev.getExecutionHistory()
+                                .getHistoricalExecution(0)
+                                .orElseThrow(NullPointerException::new)
+                                .getAttemptId())
+                .isEqualTo(e1.getAttemptId());
+        assertThat(
+                        ev.getExecutionHistory()
+                                .getHistoricalExecution(1)
+                                .orElseThrow(NullPointerException::new)
+                                .getAttemptId())
+                .isEqualTo(e2.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.getCurrentExecutionAttempt().getAttemptNumber()).isEqualTo(2);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testCancel() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.cancel();
+        assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+        assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testSuspend() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.suspend();
+        assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+        assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testFail() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.fail(new Exception("Forced test failure."));
+        assertThat(internalFailuresListener.getFailedTasks())
+                .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testMarkFailed() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.markFailed(new Exception("Forced test failure."));
+        assertThat(internalFailuresListener.getFailedTasks())
+                .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testVertexTerminationAndJobTermination() throws Exception {
+        final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+        final ExecutionGraph eg = createExecutionGraph(jobGraph);
+        eg.transitionToRunning();
+
+        ExecutionJobVertex jv = eg.getJobVertex(jobVertex.getID());
+        assert jv != null;
+        final SpeculativeExecutionVertex ev = (SpeculativeExecutionVertex) jv.getTaskVertices()[0];
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        final CompletableFuture<?> terminationFuture = ev.getTerminationFuture();
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        assertThat(terminationFuture.isDone()).isFalse();
+        assertThat(eg.getState()).isSameAs(JobStatus.RUNNING);
+
+        e2.cancel();
+        assertThat(terminationFuture.isDone()).isTrue();
+        assertThat(eg.getState()).isSameAs(JobStatus.FINISHED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveFailedExecutions() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.RUNNING);
+
+        final Execution e2 = ev.createNewSpeculativeExecution(0);
+        e2.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e2.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.currentExecution).isSameAs(e1);
+
+        final Execution e3 = ev.createNewSpeculativeExecution(0);
+        e3.transitionState(ExecutionState.RUNNING);
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.currentExecution).isSameAs(e3);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveTheOnlyCurrentExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testGetExecutionState() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.CANCELED);
+
+        // the latter added state is more likely to reach FINISH state
+        final List<ExecutionState> statesSortedByPriority = new ArrayList<>();
+        statesSortedByPriority.add(ExecutionState.FAILED);
+        statesSortedByPriority.add(ExecutionState.CANCELING);
+        statesSortedByPriority.add(ExecutionState.CREATED);
+        statesSortedByPriority.add(ExecutionState.SCHEDULED);
+        statesSortedByPriority.add(ExecutionState.DEPLOYING);
+        statesSortedByPriority.add(ExecutionState.INITIALIZING);
+        statesSortedByPriority.add(ExecutionState.RUNNING);
+        statesSortedByPriority.add(ExecutionState.FINISHED);
+
+        for (ExecutionState state : statesSortedByPriority) {
+            final Execution execution = ev.createNewSpeculativeExecution(0);
+            execution.transitionState(state);
+        }
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);

Review Comment:
   This check should be in the loop.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -54,6 +57,14 @@ public class MetricStore {
     private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>();
     private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
 
+    /**
+     * The map holds the attempt number of the representing execution for each subtask of each
+     * vertex. When a metric of an execution attempt is added, the metric can also be added to the
+     * SubtaskMetricStore when it is of the representing execution.
+     */
+    private final Map<String, Map<String, Map<Integer, Integer>>> currentExecutionAttempts =

Review Comment:
   Maybe name it as `representativeExecutionAttempts ` to make it more explanatory?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -177,7 +221,9 @@ public void add(MetricDump metric) {
             TaskManagerMetricStore tm;
             JobMetricStore job;
             TaskMetricStore task;
-            ComponentMetricStore subtask;
+            SubtaskMetricStore subtask;
+            ComponentMetricStore attempt;
+            boolean isCurrentAttempt = true;

Review Comment:
   Maybe name it as `isRepresentativeAttempt` is more explanatory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r929534318


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java:
##########
@@ -113,7 +123,8 @@ public TaskManagerInfo(
             @JsonProperty(FIELD_NAME_TOTAL_RESOURCE) ResourceProfileInfo totalResource,
             @JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo freeResource,
             @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription,
-            @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration) {
+            @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration,
+            @JsonProperty(FIELD_NAME_BLOCKED) @Nullable Boolean blocked) {

Review Comment:
   With @JsonInclude(Include.NON_DEFAULT), the json may not include the blocked field when it's false. We add the annotation since we don't want to change the REST API reponse for non speculative execution users.
   While when deserializing, jackson will try to apply a null to the field when the field is absent, and it will cause exception if the field is a boolean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r926305085


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -40,13 +43,25 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
 
     private final ArchivedExecution currentExecution; // this field must never be null
 
+    private final Collection<AccessExecution> currentExecutions;
+
     // ------------------------------------------------------------------------
 
     public ArchivedExecutionVertex(ExecutionVertex vertex) {
         this.subTaskIndex = vertex.getParallelSubtaskIndex();
         this.executionHistory = getCopyOfExecutionHistory(vertex);
         this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
-        this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
+
+        Execution vertexCurrentExecution = vertex.getCurrentExecutionAttempt();
+        assert vertex.getCurrentExecutions().contains(vertexCurrentExecution);

Review Comment:
   Better to use `Preconditions.checkState`.
   
   And it's better to check it in a more performant way, like:
   ```
   checkState(vertexCurrentExecution == vertex.getCurrentExecution(vertexCurrentExecution.getAttemptNumber));
   ```
   
   Or maybe we do not check it because it should be guaranteed by the `ExecutionVertex` itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932835599


##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java:
##########
@@ -257,18 +258,19 @@ private Map<TaskManagerLocation, ImmutableSet<ExecutionAttemptID>> groupExecutio
                         executionVertex.getExecutionState());
                 continue;
             }
-            TaskManagerLocation tmLocation = executionVertex.getCurrentAssignedResourceLocation();
-            if (tmLocation == null) {
-                LOG.trace("ExecutionVertex {} is currently not assigned", executionVertex);
-                continue;
-            }
-            Set<ExecutionAttemptID> groupedAttemptIds =
-                    executionAttemptsByLocation.getOrDefault(tmLocation, new HashSet<>());
+            for (AccessExecution execution : executionVertex.getCurrentExecutions()) {

Review Comment:
   I'm mainly not sure whether the web UI can correctly handle this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on PR #20296:
URL: https://github.com/apache/flink/pull/20296#issuecomment-1200741155

   Thanks you for all the comments!
   
   I've squashed all fixups and rebased on to the latest master. The PR should be ready for merging once the CI is passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii closed pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
gaoyunhaii closed pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution
URL: https://github.com/apache/flink/pull/20296


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932427400


##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java:
##########
@@ -44,11 +50,22 @@ public class ClusterOverview extends JobsOverview {
     @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE)
     private final int numSlotsAvailable;
 
+    @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final int numTaskManagersBlocked;
+
+    @JsonProperty(FIELD_NAME_SLOTS_FREE_AND_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final int numSlotsFreeAndBlocked;
+
     @JsonCreator
     public ClusterOverview(
             @JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
             @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
             @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+            @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) @Nullable Integer numTaskManagersBlocked,

Review Comment:
   The constructor is used as the json creator by jackson. If the json doesn't contain the field, jackson will use null as the field value, which will cause a NPE. We may meet the case if HistoryServer archive the graph with an older version of flink while read it with the new version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932468662


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -177,7 +221,9 @@ public void add(MetricDump metric) {
             TaskManagerMetricStore tm;
             JobMetricStore job;
             TaskMetricStore task;
-            ComponentMetricStore subtask;
+            SubtaskMetricStore subtask;
+            ComponentMetricStore attempt;
+            boolean isCurrentAttempt = true;

Review Comment:
   isRepresentativeAttempt here is a good name to explain the meaning. But I would limit the "representative" naming here. CurrentExecutionAttempts in the JobDetails is collected via Execution#getCurrentExecutionAttempt, and  passed to other places. Maybe we can modify the naming when we formally introduce the concept of the representative attempt for an Execution.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -54,6 +57,14 @@ public class MetricStore {
     private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>();
     private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
 
+    /**
+     * The map holds the attempt number of the representing execution for each subtask of each
+     * vertex. When a metric of an execution attempt is added, the metric can also be added to the
+     * SubtaskMetricStore when it is of the representing execution.
+     */
+    private final Map<String, Map<String, Map<Integer, Integer>>> currentExecutionAttempts =

Review Comment:
   I'd prefer to remain the naming. See the reply above.
   Or maybe we may only modify the map here in the MetricStore, but do not modify that in other classes?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java:
##########
@@ -131,18 +132,23 @@ private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(
         Map<String, String> taskManagerId2Host = new HashMap<>();
         Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
         for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
-            TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
-            String taskManagerHost =
-                    location == null
-                            ? "(unassigned)"
-                            : location.getHostname() + ':' + location.dataPort();
-            String taskmanagerId =
-                    location == null ? "(unassigned)" : location.getResourceID().toString();
-            taskManagerId2Host.put(taskmanagerId, taskManagerHost);
-            List<AccessExecutionVertex> vertices =
-                    taskManagerVertices.computeIfAbsent(
-                            taskmanagerId, ignored -> new ArrayList<>(4));
-            vertices.add(vertex);
+            List<TaskManagerLocation> locations =
+                    vertex.getCurrentExecutions().stream()
+                            .map(AccessExecution::getAssignedResourceLocation)
+                            .collect(Collectors.toList());
+            for (TaskManagerLocation location : locations) {
+                String taskManagerHost =
+                        location == null
+                                ? "(unassigned)"
+                                : location.getHostname() + ':' + location.dataPort();
+                String taskmanagerId =
+                        location == null ? "(unassigned)" : location.getResourceID().toString();
+                taskManagerId2Host.put(taskmanagerId, taskManagerHost);
+                List<AccessExecutionVertex> vertices =
+                        taskManagerVertices.computeIfAbsent(
+                                taskmanagerId, ignored -> new ArrayList<>(4));

Review Comment:
   Though I agree, I'd prefer not to change the existing code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r931890536


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java:
##########
@@ -47,6 +48,9 @@ public ClusterOverviewWithVersion(
             @JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
             @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
             @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+            @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) @Nullable Integer numTaskManagersBlocked,

Review Comment:
   Similarly could we directly use `int` here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -79,6 +96,11 @@ public ArchivedExecution getCurrentExecutionAttempt() {
         return currentExecution;
     }
 
+    @Override
+    public Collection<AccessExecution> getCurrentExecutions() {
+        return Collections.unmodifiableCollection(currentExecutions);

Review Comment:
   In consideration of the call times for this method I'm a bit tend to the current executions maintained directly via an unmodified collection. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -40,15 +44,27 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
 
     private final ArchivedExecution currentExecution; // this field must never be null
 
+    private final Collection<AccessExecution> currentExecutions;
+
     // ------------------------------------------------------------------------
 
     public ArchivedExecutionVertex(ExecutionVertex vertex) {
         this.subTaskIndex = vertex.getParallelSubtaskIndex();
         this.executionHistory = getCopyOfExecutionHistory(vertex);
         this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
-        this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
+
+        Execution vertexCurrentExecution = vertex.getCurrentExecutionAttempt();
+        currentExecutions = new ArrayList<>(vertex.getCurrentExecutions().size());
+        currentExecution = vertexCurrentExecution.archive();
+        currentExecutions.add(currentExecution);
+        for (Execution execution : vertex.getCurrentExecutions()) {

Review Comment:
   It seems we should not exclude the representing one from the list from the other part of the code?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -685,15 +688,31 @@ public CompletableFuture<TaskManagerInfoWithSlots> requestTaskManagerDetailsInfo
     @Override
     public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
         final int numberSlots = slotManager.getNumberRegisteredSlots();
-        final int numberFreeSlots = slotManager.getNumberFreeSlots();
         final ResourceProfile totalResource = slotManager.getRegisteredResource();
-        final ResourceProfile freeResource = slotManager.getFreeResource();
 
+        int numberFreeSlots = slotManager.getNumberFreeSlots();

Review Comment:
   Might skip the added checks if `blocklistHandlergetAllBlockedNodeIds() == 0` ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java:
##########
@@ -44,11 +50,22 @@ public class ClusterOverview extends JobsOverview {
     @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE)
     private final int numSlotsAvailable;
 
+    @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final int numTaskManagersBlocked;
+
+    @JsonProperty(FIELD_NAME_SLOTS_FREE_AND_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final int numSlotsFreeAndBlocked;
+
     @JsonCreator
     public ClusterOverview(
             @JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
             @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
             @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+            @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) @Nullable Integer numTaskManagersBlocked,

Review Comment:
   If we have already use `int` for the field, why cannot we also use `int` for the parameter? It seems we also not have situations that passes null actually. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java:
##########
@@ -271,6 +324,21 @@ public void serialize(
 
             jsonGenerator.writeEndObject();
 
+            if (jobDetails.currentExecutionAttempts != null

Review Comment:
   It seems `currentExecutionAttempts` cannot be null ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r931900912


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java:
##########
@@ -159,19 +149,11 @@ public void testSerialization() throws IOException, ClassNotFoundException {
         verifySerializability(archivedGraph);
     }
 
-    @Test
-    public void testCreateFromInitializingJobForSuspendedJob() {

Review Comment:
   It must be a mistake. Sorry for that and I'll move them back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932819448


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java:
##########
@@ -261,6 +261,53 @@ public static SubtaskExecutionAttemptDetailsInfo create(
                 duration,
                 ioMetricsInfo,
                 taskmanagerId,
-                getExecutionStateDuration(execution));
+                getExecutionStateDuration(execution),
+                otherConcurrentAttempts);
+    }
+
+    public List<SubtaskExecutionAttemptDetailsInfo> getOtherConcurrentAttempts() {

Review Comment:
   This method seems not called?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932906285


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java:
##########
@@ -182,30 +188,32 @@ private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(
                 allFinished &= state.isTerminal();
                 endTime = Math.max(endTime, vertex.getStateTimestamp(state));
 
-                counts.addIOMetrics(
-                        vertex.getCurrentExecutionAttempt(),
-                        metricFetcher,
-                        jobID.toString(),
-                        jobVertex.getJobVertexId().toString());
-                MutableIOMetrics current = new MutableIOMetrics();
-                current.addIOMetrics(
-                        vertex.getCurrentExecutionAttempt(),
-                        metricFetcher,
-                        jobID.toString(),
-                        jobVertex.getJobVertexId().toString());
-                ioMetricsInfos.add(
-                        new IOMetricsInfo(
-                                current.getNumBytesIn(),
-                                current.isNumBytesInComplete(),
-                                current.getNumBytesOut(),
-                                current.isNumBytesOutComplete(),
-                                current.getNumRecordsIn(),
-                                current.isNumRecordsInComplete(),
-                                current.getNumRecordsOut(),
-                                current.isNumRecordsOutComplete(),
-                                current.getAccumulateBackPressuredTime(),
-                                current.getAccumulateIdleTime(),
-                                current.getAccumulateBusyTime()));
+                for (AccessExecution attempt : vertex.getCurrentExecutions()) {

Review Comment:
   I think it's better. I'll refactor the method in this way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932832522


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -79,6 +96,11 @@ public ArchivedExecution getCurrentExecutionAttempt() {
         return currentExecution;
     }
 
+    @Override
+    public Collection<AccessExecution> getCurrentExecutions() {
+        return Collections.unmodifiableCollection(currentExecutions);

Review Comment:
   I'm not too concerned about this because it's relatively lightweight and has been widely adopted in runtime code, as long as it will not be invoked too frequently (e.g. O(N) per RPC).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932469098


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java:
##########
@@ -182,30 +188,32 @@ private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(
                 allFinished &= state.isTerminal();
                 endTime = Math.max(endTime, vertex.getStateTimestamp(state));
 
-                counts.addIOMetrics(
-                        vertex.getCurrentExecutionAttempt(),
-                        metricFetcher,
-                        jobID.toString(),
-                        jobVertex.getJobVertexId().toString());
-                MutableIOMetrics current = new MutableIOMetrics();
-                current.addIOMetrics(
-                        vertex.getCurrentExecutionAttempt(),
-                        metricFetcher,
-                        jobID.toString(),
-                        jobVertex.getJobVertexId().toString());
-                ioMetricsInfos.add(
-                        new IOMetricsInfo(
-                                current.getNumBytesIn(),
-                                current.isNumBytesInComplete(),
-                                current.getNumBytesOut(),
-                                current.isNumBytesOutComplete(),
-                                current.getNumRecordsIn(),
-                                current.isNumRecordsInComplete(),
-                                current.getNumRecordsOut(),
-                                current.isNumRecordsOutComplete(),
-                                current.getAccumulateBackPressuredTime(),
-                                current.getAccumulateIdleTime(),
-                                current.getAccumulateBusyTime()));
+                for (AccessExecution attempt : vertex.getCurrentExecutions()) {

Review Comment:
   You are right. While when I was trying to do a further refactor of this handler, I found that maybe we should use only the representing execution to calculate the aggregated task manager status, or the state can be confusing since if one attempt is CANCELED (even it's a speculative one), the state of the task manager is CANCELED. I don't think this is the expected behavior. I'd revert the changes and add a comment to describe why we choose to use the representing attempt only.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java:
##########
@@ -99,25 +99,27 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
         List<ArchivedJson> archive = new ArrayList<>(16);
         for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
             for (AccessExecutionVertex subtask : task.getTaskVertices()) {
-                ResponseBody curAttemptJson =
-                        createAccumulatorInfo(subtask.getCurrentExecutionAttempt());
-                String curAttemptPath =
-                        getMessageHeaders()
-                                .getTargetRestEndpointURL()
-                                .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
-                                .replace(
-                                        ':' + JobVertexIdPathParameter.KEY,
-                                        task.getJobVertexId().toString())
-                                .replace(
-                                        ':' + SubtaskIndexPathParameter.KEY,
-                                        String.valueOf(subtask.getParallelSubtaskIndex()))
-                                .replace(
-                                        ':' + SubtaskAttemptPathParameter.KEY,
-                                        String.valueOf(
-                                                subtask.getCurrentExecutionAttempt()
-                                                        .getAttemptNumber()));
-
-                archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+                for (AccessExecution attempt : subtask.getCurrentExecutions()) {
+                    if (attempt != null) {

Review Comment:
   It refers to the HistoricalExecutions part, but indeed it's redundant since the attempt should never be null. I'll remove the check.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java:
##########
@@ -257,18 +258,19 @@ private Map<TaskManagerLocation, ImmutableSet<ExecutionAttemptID>> groupExecutio
                         executionVertex.getExecutionState());
                 continue;
             }
-            TaskManagerLocation tmLocation = executionVertex.getCurrentAssignedResourceLocation();
-            if (tmLocation == null) {
-                LOG.trace("ExecutionVertex {} is currently not assigned", executionVertex);
-                continue;
-            }
-            Set<ExecutionAttemptID> groupedAttemptIds =
-                    executionAttemptsByLocation.getOrDefault(tmLocation, new HashSet<>());
+            for (AccessExecution execution : executionVertex.getCurrentExecutions()) {

Review Comment:
   The ExecutionAttemptID stands for a specific attempt of a specific execution. It seems the further processing doesn't care if there are ExecutionAttemptIDs belonging to the same execution. It processes each ExecutionAttemptID independently.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) {
+        Map<Integer, ComponentMetricStore> subtaskMetricStores =
+                taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size());
         for (Map.Entry<Integer, ComponentMetricStore> entry : subtaskMetricStores.entrySet()) {
             int subtaskIndex = entry.getKey();
-            ComponentMetricStore subtaskMetricStore = entry.getValue();
-            double backPressureRatio = getBackPressureRatio(subtaskMetricStore);
-            double idleRatio = getIdleRatio(subtaskMetricStore);
-            double busyRatio = getBusyRatio(subtaskMetricStore);
-            result.add(
-                    new SubtaskBackPressureInfo(
-                            subtaskIndex,
-                            getBackPressureLevel(backPressureRatio),
-                            backPressureRatio,
-                            idleRatio,
-                            busyRatio));
+            SubtaskMetricStore subtaskMetricStore =
+                    taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+            Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+                    subtaskMetricStore.getAllAttemptsMetricStores();
+            if (allAttemptsMetricStores.isEmpty() || allAttemptsMetricStores.size() == 1) {
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex, null, subtaskMetricStore, null));
+            } else {
+                int currentAttempt =
+                        currentExecutionAttempts == null
+                                ? -1
+                                : currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
+                if (!allAttemptsMetricStores.containsKey(currentAttempt)) {
+                    // allAttemptsMetricStores is not empty here
+                    currentAttempt = allAttemptsMetricStores.keySet().iterator().next();
+                }
+                List<SubtaskBackPressureInfo> otherConcurrentAttempts =
+                        new ArrayList<>(allAttemptsMetricStores.size() - 1);
+                for (Map.Entry<Integer, ComponentMetricStore> attemptStore :
+                        allAttemptsMetricStores.entrySet()) {
+                    if (attemptStore.getKey() == currentAttempt) {
+                        continue;
+                    }
+                    otherConcurrentAttempts.add(
+                            createSubtaskAttemptBackpressureInfo(
+                                    subtaskIndex,
+                                    attemptStore.getKey(),
+                                    attemptStore.getValue(),
+                                    null));
+                }
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex,
+                                currentAttempt,
+                                allAttemptsMetricStores.get(currentAttempt),
+                                otherConcurrentAttempts));
+            }
         }
         result.sort(Comparator.comparingInt(SubtaskBackPressureInfo::getSubtask));

Review Comment:
   Yes it is since there's only one element in the result for each subtask. Other attempts are in the otherConcurrentAttempts.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) {
+        Map<Integer, ComponentMetricStore> subtaskMetricStores =
+                taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size());
         for (Map.Entry<Integer, ComponentMetricStore> entry : subtaskMetricStores.entrySet()) {
             int subtaskIndex = entry.getKey();
-            ComponentMetricStore subtaskMetricStore = entry.getValue();
-            double backPressureRatio = getBackPressureRatio(subtaskMetricStore);
-            double idleRatio = getIdleRatio(subtaskMetricStore);
-            double busyRatio = getBusyRatio(subtaskMetricStore);
-            result.add(
-                    new SubtaskBackPressureInfo(
-                            subtaskIndex,
-                            getBackPressureLevel(backPressureRatio),
-                            backPressureRatio,
-                            idleRatio,
-                            busyRatio));
+            SubtaskMetricStore subtaskMetricStore =
+                    taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+            Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+                    subtaskMetricStore.getAllAttemptsMetricStores();
+            if (allAttemptsMetricStores.isEmpty() || allAttemptsMetricStores.size() == 1) {
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex, null, subtaskMetricStore, null));
+            } else {
+                int currentAttempt =
+                        currentExecutionAttempts == null

Review Comment:
   currentExecutionAttempts is acquired from the JobDetails. In fact the currentExecutionAttempts of a subtask only exists when there's more than one current attempt. So it's normal that currentExecutionAttempts is null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932427401


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -156,6 +179,27 @@ public synchronized ComponentMetricStore getSubtaskMetricStore(
         return ComponentMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex));
     }
 
+    public synchronized ComponentMetricStore getSubtaskAttemptMetricStore(
+            String jobID, String taskID, int subtaskIndex, int attemptId) {
+        JobMetricStore job = jobID == null ? null : jobs.get(jobID);
+        if (job == null) {
+            return null;
+        }
+        TaskMetricStore task = job.getTaskMetricStore(taskID);
+        if (task == null) {
+            return null;
+        }
+        SubtaskMetricStore subtask = task.getSubtaskMetricStore(subtaskIndex);
+        if (attemptId < 0) {

Review Comment:
   A negative attempt number is considered valid before when there were constructors using -1 as the default value, and was used to acquire the subtask level metrics instead of the attempt level one. Now that we have removed those constructors, a negative attempt  number is indeed invalid. I'll cleanup the relating codes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932905277


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java:
##########
@@ -127,22 +128,24 @@ private static JobExceptionsInfoWithHistory createJobExceptionsInfo(
         List<JobExceptionsInfo.ExecutionExceptionInfo> taskExceptionList = new ArrayList<>();
         boolean truncated = false;
         for (AccessExecutionVertex task : executionGraph.getAllExecutionVertices()) {
-            Optional<ErrorInfo> failure = task.getFailureInfo();
-            if (failure.isPresent()) {
-                if (taskExceptionList.size() >= exceptionToReportMaxSize) {
-                    truncated = true;
-                    break;
+            for (AccessExecution execution : task.getCurrentExecutions()) {
+                Optional<ErrorInfo> failure = execution.getFailureInfo();
+                if (failure.isPresent()) {
+                    if (taskExceptionList.size() >= exceptionToReportMaxSize) {
+                        truncated = true;
+                        break;
+                    }
+
+                    TaskManagerLocation location = execution.getAssignedResourceLocation();
+                    String locationString = toString(location);
+                    long timestamp = execution.getStateTimestamp(ExecutionState.FAILED);
+                    taskExceptionList.add(
+                            new JobExceptionsInfo.ExecutionExceptionInfo(
+                                    failure.get().getExceptionAsString(),
+                                    task.getTaskNameWithSubtaskIndex(),

Review Comment:
   I'm not sure if we should modify it here in this pr. The change is beneficial while seems not relating to the speculative execution. Maybe it's better to do as a separate issue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r931757494


##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java:
##########
@@ -130,10 +150,16 @@ public String toString() {
         return "StatusOverview {"
                 + "numTaskManagersConnected="
                 + numTaskManagersConnected
+                + (numTaskManagersBlocked == 0
+                        ? ""
+                        : (", numSlotsBlocked=" + numTaskManagersBlocked))
                 + ", numSlotsTotal="
                 + numSlotsTotal
                 + ", numSlotsAvailable="
                 + numSlotsAvailable
+                + (numSlotsFreeAndBlocked == 0
+                        ? ""
+                        : (", numSlotsBlocked=" + numSlotsFreeAndBlocked))

Review Comment:
   numSlotsBlocked  -> numSlotsFreeAndBlocked



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java:
##########
@@ -130,10 +150,16 @@ public String toString() {
         return "StatusOverview {"
                 + "numTaskManagersConnected="
                 + numTaskManagersConnected
+                + (numTaskManagersBlocked == 0
+                        ? ""
+                        : (", numSlotsBlocked=" + numTaskManagersBlocked))

Review Comment:
   numSlotsBlocked -> numTaskManagersBlocked



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java:
##########
@@ -159,19 +149,11 @@ public void testSerialization() throws IOException, ClassNotFoundException {
         verifySerializability(archivedGraph);
     }
 
-    @Test
-    public void testCreateFromInitializingJobForSuspendedJob() {
-        final ArchivedExecutionGraph suspendedExecutionGraph =
-                ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
-                        new JobID(),
-                        "TestJob",
-                        JobStatus.SUSPENDED,
-                        new Exception("Test suspension exception"),
-                        null,
-                        System.currentTimeMillis());
-
-        assertThat(suspendedExecutionGraph.getState(), is(JobStatus.SUSPENDED));
-        assertThat(suspendedExecutionGraph.getFailureInfo(), notNullValue());
+    public static void assertContainsCheckpointSettings(ArchivedExecutionGraph archivedGraph) {

Review Comment:
   `Test` methods are not needed to be public in JUnit5.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java:
##########
@@ -42,43 +41,34 @@
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-import org.apache.flink.util.OptionalFailure;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Function;
 
 import static java.util.Arrays.asList;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link ArchivedExecutionGraph}. */
 public class ArchivedExecutionGraphTest extends TestLogger {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     private static ExecutionGraph runtimeGraph;
 
-    @BeforeClass
+    @BeforeAll
     public static void setupExecutionGraph() throws Exception {

Review Comment:
   `Before*`/`After*` methods are not needed to be public in JUnit5.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java:
##########
@@ -79,7 +79,7 @@ public void testJobDetailsMarshalling() throws JsonProcessingException {
 
         final JobDetails unmarshalled = objectMapper.treeToValue(marshalled, JobDetails.class);
 
-        assertEquals(expected, unmarshalled);
+        assertThat(expected).isEqualTo(unmarshalled);

Review Comment:
   -> assertThat(unmarshalled).isEqualTo(expected);



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java:
##########
@@ -159,19 +149,11 @@ public void testSerialization() throws IOException, ClassNotFoundException {
         verifySerializability(archivedGraph);
     }
 
-    @Test
-    public void testCreateFromInitializingJobForSuspendedJob() {

Review Comment:
   Moving around these methods makes hard to compare the diff of the changes.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java:
##########
@@ -42,43 +41,34 @@
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-import org.apache.flink.util.OptionalFailure;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Function;
 
 import static java.util.Arrays.asList;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link ArchivedExecutionGraph}. */
 public class ArchivedExecutionGraphTest extends TestLogger {

Review Comment:
   `public` is no longer needed for JUnit5 tests.
   `extends TestLogger` is no longer needed as well.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java:
##########
@@ -101,6 +101,6 @@ public void testJobDetailsCompatibleUnmarshalling() throws IOException {
         final JobDetails unmarshalled =
                 objectMapper.readValue(COMPATIBLE_JOB_DETAILS, JobDetails.class);
 
-        assertEquals(expected, unmarshalled);
+        assertThat(expected).isEqualTo(unmarshalled);

Review Comment:
   -> assertThat(unmarshalled).isEqualTo(expected);



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932833249


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java:
##########
@@ -131,18 +132,23 @@ private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(
         Map<String, String> taskManagerId2Host = new HashMap<>();
         Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
         for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
-            TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
-            String taskManagerHost =
-                    location == null
-                            ? "(unassigned)"
-                            : location.getHostname() + ':' + location.dataPort();
-            String taskmanagerId =
-                    location == null ? "(unassigned)" : location.getResourceID().toString();
-            taskManagerId2Host.put(taskmanagerId, taskManagerHost);
-            List<AccessExecutionVertex> vertices =
-                    taskManagerVertices.computeIfAbsent(
-                            taskmanagerId, ignored -> new ArrayList<>(4));
-            vertices.add(vertex);
+            List<TaskManagerLocation> locations =
+                    vertex.getCurrentExecutions().stream()
+                            .map(AccessExecution::getAssignedResourceLocation)
+                            .collect(Collectors.toList());
+            for (TaskManagerLocation location : locations) {
+                String taskManagerHost =
+                        location == null
+                                ? "(unassigned)"
+                                : location.getHostname() + ':' + location.dataPort();
+                String taskmanagerId =
+                        location == null ? "(unassigned)" : location.getResourceID().toString();
+                taskManagerId2Host.put(taskmanagerId, taskManagerHost);
+                List<AccessExecutionVertex> vertices =
+                        taskManagerVertices.computeIfAbsent(
+                                taskmanagerId, ignored -> new ArrayList<>(4));

Review Comment:
   It should be avoided according to the code style guide.
   https://flink.apache.org/contributing/code-style-and-quality-java.html#collections



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r932838778


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java:
##########
@@ -100,26 +106,73 @@ private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) {
+        Map<Integer, ComponentMetricStore> subtaskMetricStores =
+                taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size());
         for (Map.Entry<Integer, ComponentMetricStore> entry : subtaskMetricStores.entrySet()) {
             int subtaskIndex = entry.getKey();
-            ComponentMetricStore subtaskMetricStore = entry.getValue();
-            double backPressureRatio = getBackPressureRatio(subtaskMetricStore);
-            double idleRatio = getIdleRatio(subtaskMetricStore);
-            double busyRatio = getBusyRatio(subtaskMetricStore);
-            result.add(
-                    new SubtaskBackPressureInfo(
-                            subtaskIndex,
-                            getBackPressureLevel(backPressureRatio),
-                            backPressureRatio,
-                            idleRatio,
-                            busyRatio));
+            SubtaskMetricStore subtaskMetricStore =
+                    taskMetricStore.getSubtaskMetricStore(subtaskIndex);
+            Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+                    subtaskMetricStore.getAllAttemptsMetricStores();
+            if (allAttemptsMetricStores.isEmpty() || allAttemptsMetricStores.size() == 1) {
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex, null, subtaskMetricStore, null));
+            } else {
+                int currentAttempt =
+                        currentExecutionAttempts == null
+                                ? -1
+                                : currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
+                if (!allAttemptsMetricStores.containsKey(currentAttempt)) {

Review Comment:
   So this means it can happen that the displayed representative attempt in the WebUI is not the true representative attempt? e.g. A CANCELED attempt is displayed as representative attempt while there is another RUNNING/FINISHED attempt.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org