You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/28 08:02:36 UTC

[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

gaoyunhaii commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r931890536


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java:
##########
@@ -47,6 +48,9 @@ public ClusterOverviewWithVersion(
             @JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
             @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
             @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+            @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) @Nullable Integer numTaskManagersBlocked,

Review Comment:
   Similarly could we directly use `int` here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -79,6 +96,11 @@ public ArchivedExecution getCurrentExecutionAttempt() {
         return currentExecution;
     }
 
+    @Override
+    public Collection<AccessExecution> getCurrentExecutions() {
+        return Collections.unmodifiableCollection(currentExecutions);

Review Comment:
   In consideration of the call times for this method I'm a bit tend to the current executions maintained directly via an unmodified collection. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -40,15 +44,27 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
 
     private final ArchivedExecution currentExecution; // this field must never be null
 
+    private final Collection<AccessExecution> currentExecutions;
+
     // ------------------------------------------------------------------------
 
     public ArchivedExecutionVertex(ExecutionVertex vertex) {
         this.subTaskIndex = vertex.getParallelSubtaskIndex();
         this.executionHistory = getCopyOfExecutionHistory(vertex);
         this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
-        this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
+
+        Execution vertexCurrentExecution = vertex.getCurrentExecutionAttempt();
+        currentExecutions = new ArrayList<>(vertex.getCurrentExecutions().size());
+        currentExecution = vertexCurrentExecution.archive();
+        currentExecutions.add(currentExecution);
+        for (Execution execution : vertex.getCurrentExecutions()) {

Review Comment:
   It seems we should not exclude the representing one from the list from the other part of the code?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -685,15 +688,31 @@ public CompletableFuture<TaskManagerInfoWithSlots> requestTaskManagerDetailsInfo
     @Override
     public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
         final int numberSlots = slotManager.getNumberRegisteredSlots();
-        final int numberFreeSlots = slotManager.getNumberFreeSlots();
         final ResourceProfile totalResource = slotManager.getRegisteredResource();
-        final ResourceProfile freeResource = slotManager.getFreeResource();
 
+        int numberFreeSlots = slotManager.getNumberFreeSlots();

Review Comment:
   Might skip the added checks if `blocklistHandlergetAllBlockedNodeIds() == 0` ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java:
##########
@@ -44,11 +50,22 @@ public class ClusterOverview extends JobsOverview {
     @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE)
     private final int numSlotsAvailable;
 
+    @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final int numTaskManagersBlocked;
+
+    @JsonProperty(FIELD_NAME_SLOTS_FREE_AND_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final int numSlotsFreeAndBlocked;
+
     @JsonCreator
     public ClusterOverview(
             @JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
             @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
             @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+            @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) @Nullable Integer numTaskManagersBlocked,

Review Comment:
   If we have already use `int` for the field, why cannot we also use `int` for the parameter? It seems we also not have situations that passes null actually. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java:
##########
@@ -271,6 +324,21 @@ public void serialize(
 
             jsonGenerator.writeEndObject();
 
+            if (jobDetails.currentExecutionAttempts != null

Review Comment:
   It seems `currentExecutionAttempts` cannot be null ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org