You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/08/01 10:30:59 UTC
[flink] 06/06: [FLINK-28588][rest] Acquire information of all current executions in REST handlers if applicable
This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f436b20429b55ada2a9e8936a5e80fc672a397de
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Sun Jul 31 22:11:46 2022 +0800
[FLINK-28588][rest] Acquire information of all current executions in REST handlers if applicable
This closes #20296.
---
.../src/test/resources/rest_api_v1.snapshot | 31 +++
.../handler/job/AbstractSubtaskAttemptHandler.java | 15 +-
.../rest/handler/job/JobDetailsHandler.java | 2 +
.../rest/handler/job/JobExceptionsHandler.java | 33 ++--
.../handler/job/JobVertexBackPressureHandler.java | 87 +++++++--
.../rest/handler/job/JobVertexDetailsHandler.java | 17 +-
.../handler/job/JobVertexTaskManagersHandler.java | 73 +++++---
.../job/SubtaskCurrentAttemptDetailsHandler.java | 19 +-
...SubtaskExecutionAttemptAccumulatorsHandler.java | 38 ++--
.../job/SubtaskExecutionAttemptDetailsHandler.java | 52 +++---
.../job/SubtasksAllAccumulatorsHandler.java | 32 ++--
.../rest/handler/job/SubtasksTimesHandler.java | 3 +-
.../rest/messages/JobVertexBackPressureInfo.java | 45 ++++-
.../job/SubtaskExecutionAttemptDetailsInfo.java | 105 +++++++----
.../threadinfo/JobVertexThreadInfoTracker.java | 24 +--
.../job/JobVertexBackPressureHandlerTest.java | 207 +++++++++++++++++++++
.../SubtaskCurrentAttemptDetailsHandlerTest.java | 3 +-
.../SubtaskExecutionAttemptDetailsHandlerTest.java | 3 +-
.../messages/AggregatedTaskDetailsInfoTest.java | 3 +-
.../messages/JobVertexBackPressureInfoTest.java | 24 ++-
.../rest/messages/JobVertexDetailsInfoTest.java | 24 ++-
.../SubtaskExecutionAttemptDetailsInfoTest.java | 3 +-
22 files changed, 651 insertions(+), 192 deletions(-)
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 873e5062d7d..85337fa8795 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -2403,6 +2403,13 @@
"type" : "integer"
}
},
+ "other-concurrent-attempts" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo"
+ }
+ },
"start_time" : {
"type" : "integer"
}
@@ -2522,6 +2529,9 @@
"subtask" : {
"type" : "integer"
},
+ "attempt-number" : {
+ "type" : "integer"
+ },
"backpressure-level" : {
"type" : "string",
"enum" : [ "ok", "low", "high" ]
@@ -2534,6 +2544,13 @@
},
"busyRatio" : {
"type" : "number"
+ },
+ "other-concurrent-attempts" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexBackPressureInfo:SubtaskBackPressureInfo"
+ }
}
}
}
@@ -2803,6 +2820,13 @@
"type" : "integer"
}
},
+ "other-concurrent-attempts" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo"
+ }
+ },
"start_time" : {
"type" : "integer"
}
@@ -2904,6 +2928,13 @@
"type" : "integer"
}
},
+ "other-concurrent-attempts" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo"
+ }
+ },
"start_time" : {
"type" : "integer"
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java
index 15d128e43e2..04b066860ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
@@ -88,11 +89,17 @@ public abstract class AbstractSubtaskAttemptHandler<
throws RestHandlerException {
final Integer attemptNumber = request.getPathParameter(SubtaskAttemptPathParameter.class);
- final AccessExecution currentAttempt = executionVertex.getCurrentExecutionAttempt();
+ final Collection<AccessExecution> currentExecutions =
+ executionVertex.getCurrentExecutions();
final ExecutionHistory executionHistory = executionVertex.getExecutionHistory();
- if (attemptNumber == currentAttempt.getAttemptNumber()) {
- return handleRequest(request, currentAttempt);
- } else if (executionHistory.isValidAttemptNumber(attemptNumber)) {
+
+ for (AccessExecution currentExecution : currentExecutions) {
+ if (attemptNumber == currentExecution.getAttemptNumber()) {
+ return handleRequest(request, currentExecution);
+ }
+ }
+
+ if (executionHistory.isValidAttemptNumber(attemptNumber)) {
final Optional<? extends AccessExecution> execution =
executionHistory.getHistoricalExecution(attemptNumber);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 4f850bc0712..f38a7688ab9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -202,6 +202,8 @@ public class JobDetailsHandler
MutableIOMetrics counts = new MutableIOMetrics();
for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
+ // Here we use the metrics of one of the current attempts to represent the subtask,
+ // rather than the aggregation of all attempts.
counts.addIOMetrics(
vertex.getCurrentExecutionAttempt(),
metricFetcher,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index c500792c6aa..4ae57f6bc28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
@@ -127,22 +128,24 @@ public class JobExceptionsHandler
List<JobExceptionsInfo.ExecutionExceptionInfo> taskExceptionList = new ArrayList<>();
boolean truncated = false;
for (AccessExecutionVertex task : executionGraph.getAllExecutionVertices()) {
- Optional<ErrorInfo> failure = task.getFailureInfo();
- if (failure.isPresent()) {
- if (taskExceptionList.size() >= exceptionToReportMaxSize) {
- truncated = true;
- break;
+ for (AccessExecution execution : task.getCurrentExecutions()) {
+ Optional<ErrorInfo> failure = execution.getFailureInfo();
+ if (failure.isPresent()) {
+ if (taskExceptionList.size() >= exceptionToReportMaxSize) {
+ truncated = true;
+ break;
+ }
+
+ TaskManagerLocation location = execution.getAssignedResourceLocation();
+ String locationString = toString(location);
+ long timestamp = execution.getStateTimestamp(ExecutionState.FAILED);
+ taskExceptionList.add(
+ new JobExceptionsInfo.ExecutionExceptionInfo(
+ failure.get().getExceptionAsString(),
+ task.getTaskNameWithSubtaskIndex(),
+ locationString,
+ timestamp == 0 ? -1 : timestamp));
}
-
- TaskManagerLocation location = task.getCurrentAssignedResourceLocation();
- String locationString = toString(location);
- long timestamp = task.getStateTimestamp(ExecutionState.FAILED);
- taskExceptionList.add(
- new JobExceptionsInfo.ExecutionExceptionInfo(
- failure.get().getExceptionAsString(),
- task.getTaskNameWithSubtaskIndex(),
- locationString,
- timestamp == 0 ? -1 : timestamp));
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
index 895fe7011ba..898b74ba0d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.ComponentMetricStore;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.SubtaskMetricStore;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.TaskMetricStore;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
@@ -39,6 +40,7 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Comparator;
@@ -80,18 +82,23 @@ public class JobVertexBackPressureHandler
metricFetcher
.getMetricStore()
.getTaskMetricStore(jobId.toString(), jobVertexId.toString());
+ Map<String, Map<Integer, Integer>> jobCurrentExecutions =
+ metricFetcher.getMetricStore().getCurrentExecutionAttempts().get(jobId.toString());
+ Map<Integer, Integer> currentExecutionAttempts =
+ jobCurrentExecutions != null
+ ? jobCurrentExecutions.get(jobVertexId.toString())
+ : null;
return CompletableFuture.completedFuture(
taskMetricStore != null
- ? createJobVertexBackPressureInfo(
- taskMetricStore.getAllSubtaskMetricStores())
+ ? createJobVertexBackPressureInfo(taskMetricStore, currentExecutionAttempts)
: JobVertexBackPressureInfo.deprecated());
}
private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
- Map<Integer, ComponentMetricStore> allSubtaskMetricStores) {
+ TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) {
List<SubtaskBackPressureInfo> subtaskBackPressureInfos =
- createSubtaskBackPressureInfo(allSubtaskMetricStores);
+ createSubtaskBackPressureInfo(taskMetricStore, currentExecutionAttempts);
return new JobVertexBackPressureInfo(
JobVertexBackPressureInfo.VertexBackPressureStatus.OK,
getBackPressureLevel(getMaxBackPressureRatio(subtaskBackPressureInfos)),
@@ -100,26 +107,72 @@ public class JobVertexBackPressureHandler
}
private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
- Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+ TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) {
+ Map<Integer, SubtaskMetricStore> subtaskMetricStores =
+ taskMetricStore.getAllSubtaskMetricStores();
List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size());
- for (Map.Entry<Integer, ComponentMetricStore> entry : subtaskMetricStores.entrySet()) {
+ for (Map.Entry<Integer, SubtaskMetricStore> entry : subtaskMetricStores.entrySet()) {
int subtaskIndex = entry.getKey();
- ComponentMetricStore subtaskMetricStore = entry.getValue();
- double backPressureRatio = getBackPressureRatio(subtaskMetricStore);
- double idleRatio = getIdleRatio(subtaskMetricStore);
- double busyRatio = getBusyRatio(subtaskMetricStore);
- result.add(
- new SubtaskBackPressureInfo(
- subtaskIndex,
- getBackPressureLevel(backPressureRatio),
- backPressureRatio,
- idleRatio,
- busyRatio));
+ SubtaskMetricStore subtaskMetricStore = entry.getValue();
+ Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+ subtaskMetricStore.getAllAttemptsMetricStores();
+ if (allAttemptsMetricStores.isEmpty() || allAttemptsMetricStores.size() == 1) {
+ result.add(
+ createSubtaskAttemptBackpressureInfo(
+ subtaskIndex, null, subtaskMetricStore, null));
+ } else {
+ int currentAttempt =
+ currentExecutionAttempts == null
+ ? -1
+ : currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
+ if (!allAttemptsMetricStores.containsKey(currentAttempt)) {
+ // allAttemptsMetricStores is not empty here
+ currentAttempt = allAttemptsMetricStores.keySet().iterator().next();
+ }
+ List<SubtaskBackPressureInfo> otherConcurrentAttempts =
+ new ArrayList<>(allAttemptsMetricStores.size() - 1);
+ for (Map.Entry<Integer, ComponentMetricStore> attemptStore :
+ allAttemptsMetricStores.entrySet()) {
+ if (attemptStore.getKey() == currentAttempt) {
+ continue;
+ }
+ otherConcurrentAttempts.add(
+ createSubtaskAttemptBackpressureInfo(
+ subtaskIndex,
+ attemptStore.getKey(),
+ attemptStore.getValue(),
+ null));
+ }
+ result.add(
+ createSubtaskAttemptBackpressureInfo(
+ subtaskIndex,
+ currentAttempt,
+ allAttemptsMetricStores.get(currentAttempt),
+ otherConcurrentAttempts));
+ }
}
result.sort(Comparator.comparingInt(SubtaskBackPressureInfo::getSubtask));
return result;
}
+ private SubtaskBackPressureInfo createSubtaskAttemptBackpressureInfo(
+ int subtaskIndex,
+ @Nullable Integer attemptNumber,
+ ComponentMetricStore metricStore,
+ @Nullable List<SubtaskBackPressureInfo> otherConcurrentAttempts) {
+ double backPressureRatio = getBackPressureRatio(metricStore);
+ double idleRatio = getIdleRatio(metricStore);
+ double busyRatio = getBusyRatio(metricStore);
+ return new SubtaskBackPressureInfo(
+ subtaskIndex,
+ attemptNumber,
+ getBackPressureLevel(backPressureRatio),
+ backPressureRatio,
+ idleRatio,
+ busyRatio,
+ otherConcurrentAttempts);
+ }
+
private double getMaxBackPressureRatio(List<SubtaskBackPressureInfo> subtaskBackPressureInfos) {
return subtaskBackPressureInfos.stream()
.mapToDouble(backPressureInfo -> backPressureInfo.getBackPressuredRatio())
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
index e4ac7708489..5877567db77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
@@ -120,9 +120,24 @@ public class JobVertexDetailsHandler
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
final AccessExecution execution = vertex.getCurrentExecutionAttempt();
final JobVertexID jobVertexID = jobVertex.getJobVertexId();
+
+ final Collection<AccessExecution> attempts = vertex.getCurrentExecutions();
+ List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts = null;
+
+ if (attempts.size() > 1) {
+ otherConcurrentAttempts = new ArrayList<>();
+ for (AccessExecution attempt : attempts) {
+ if (attempt.getAttemptNumber() != execution.getAttemptNumber()) {
+ otherConcurrentAttempts.add(
+ SubtaskExecutionAttemptDetailsInfo.create(
+ attempt, metricFetcher, jobID, jobVertexID, null));
+ }
+ }
+ }
+
subtasks.add(
SubtaskExecutionAttemptDetailsInfo.create(
- execution, metricFetcher, jobID, jobVertexID));
+ execution, metricFetcher, jobID, jobVertexID, otherConcurrentAttempts));
}
return new JobVertexDetailsInfo(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 3feea441d2f..450535f8c7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -55,8 +56,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
@@ -127,40 +130,48 @@ public class JobVertexTaskManagersHandler
AccessExecutionJobVertex jobVertex,
JobID jobID,
@Nullable MetricFetcher metricFetcher) {
- // Build a map that groups tasks by TaskManager
+ // Build a map that groups task executions by TaskManager
Map<String, String> taskManagerId2Host = new HashMap<>();
- Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
+ Map<String, List<AccessExecution>> taskManagerExecutions = new HashMap<>();
+ Set<AccessExecution> representativeExecutions = new HashSet<>();
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
- TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
- String taskManagerHost =
- location == null
- ? "(unassigned)"
- : location.getHostname() + ':' + location.dataPort();
- String taskmanagerId =
- location == null ? "(unassigned)" : location.getResourceID().toString();
- taskManagerId2Host.put(taskmanagerId, taskManagerHost);
- List<AccessExecutionVertex> vertices =
- taskManagerVertices.computeIfAbsent(
- taskmanagerId, ignored -> new ArrayList<>(4));
- vertices.add(vertex);
+ AccessExecution representativeAttempt = vertex.getCurrentExecutionAttempt();
+ representativeExecutions.add(representativeAttempt);
+
+ for (AccessExecution execution : vertex.getCurrentExecutions()) {
+ TaskManagerLocation location = execution.getAssignedResourceLocation();
+ String taskManagerHost =
+ location == null
+ ? "(unassigned)"
+ : location.getHostname() + ':' + location.dataPort();
+ String taskmanagerId =
+ location == null ? "(unassigned)" : location.getResourceID().toString();
+ taskManagerId2Host.put(taskmanagerId, taskManagerHost);
+ List<AccessExecution> executions =
+ taskManagerExecutions.computeIfAbsent(
+ taskmanagerId, ignored -> new ArrayList<>());
+ executions.add(execution);
+ }
}
final long now = System.currentTimeMillis();
List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>(4);
- for (Map.Entry<String, List<AccessExecutionVertex>> entry :
- taskManagerVertices.entrySet()) {
+ for (Map.Entry<String, List<AccessExecution>> entry : taskManagerExecutions.entrySet()) {
String taskmanagerId = entry.getKey();
String host = taskManagerId2Host.get(taskmanagerId);
- List<AccessExecutionVertex> taskVertices = entry.getValue();
+ List<AccessExecution> executions = entry.getValue();
List<IOMetricsInfo> ioMetricsInfos = new ArrayList<>();
List<Map<ExecutionState, Long>> status =
- taskVertices.stream()
- .map(AccessExecutionVertex::getCurrentExecutionAttempt)
+ executions.stream()
.map(StatusDurationUtils::getExecutionStateDuration)
.collect(Collectors.toList());
+ // executionsPerState counts attempts of a subtask separately
+ int[] executionsPerState = new int[ExecutionState.values().length];
+ // tasksPerState counts only the representative attempts, and is used to aggregate the
+ // task manager state
int[] tasksPerState = new int[ExecutionState.values().length];
long startTime = Long.MAX_VALUE;
@@ -169,27 +180,32 @@ public class JobVertexTaskManagersHandler
MutableIOMetrics counts = new MutableIOMetrics();
- for (AccessExecutionVertex vertex : taskVertices) {
- final ExecutionState state = vertex.getExecutionState();
- tasksPerState[state.ordinal()]++;
+ int representativeAttemptsCount = 0;
+ for (AccessExecution execution : executions) {
+ final ExecutionState state = execution.getState();
+ executionsPerState[state.ordinal()]++;
+ if (representativeExecutions.contains(execution)) {
+ tasksPerState[state.ordinal()]++;
+ representativeAttemptsCount++;
+ }
// take the earliest start time
- long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
+ long started = execution.getStateTimestamp(ExecutionState.DEPLOYING);
if (started > 0) {
startTime = Math.min(startTime, started);
}
allFinished &= state.isTerminal();
- endTime = Math.max(endTime, vertex.getStateTimestamp(state));
+ endTime = Math.max(endTime, execution.getStateTimestamp(state));
counts.addIOMetrics(
- vertex.getCurrentExecutionAttempt(),
+ execution,
metricFetcher,
jobID.toString(),
jobVertex.getJobVertexId().toString());
MutableIOMetrics current = new MutableIOMetrics();
current.addIOMetrics(
- vertex.getCurrentExecutionAttempt(),
+ execution,
metricFetcher,
jobID.toString(),
jobVertex.getJobVertexId().toString());
@@ -222,9 +238,10 @@ public class JobVertexTaskManagersHandler
duration = -1L;
}
+ // Safe when tasksPerState are all zero and representativeAttemptsCount is zero
ExecutionState jobVertexState =
ExecutionJobVertex.getAggregateJobVertexState(
- tasksPerState, taskVertices.size());
+ tasksPerState, representativeAttemptsCount);
final IOMetricsInfo jobVertexMetrics =
new IOMetricsInfo(
@@ -243,7 +260,7 @@ public class JobVertexTaskManagersHandler
Map<ExecutionState, Integer> statusCounts =
new HashMap<>(ExecutionState.values().length);
for (ExecutionState state : ExecutionState.values()) {
- statusCounts.put(state, tasksPerState[state.ordinal()]);
+ statusCounts.put(state, executionsPerState[state.ordinal()]);
}
taskManagersInfoList.add(
new JobVertexTaskManagersInfo.TaskManagersInfo(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
index ba25ac5c24f..0e52d088158 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
@@ -37,6 +37,9 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -81,7 +84,21 @@ public class SubtaskCurrentAttemptDetailsHandler
final JobID jobID = request.getPathParameter(JobIDPathParameter.class);
final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
+ final Collection<AccessExecution> attempts = executionVertex.getCurrentExecutions();
+ List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts = null;
+
+ if (attempts.size() > 1) {
+ otherConcurrentAttempts = new ArrayList<>();
+ for (AccessExecution attempt : attempts) {
+ if (attempt.getAttemptNumber() != execution.getAttemptNumber()) {
+ otherConcurrentAttempts.add(
+ SubtaskExecutionAttemptDetailsInfo.create(
+ attempt, metricFetcher, jobID, jobVertexID, null));
+ }
+ }
+ }
+
return SubtaskExecutionAttemptDetailsInfo.create(
- execution, metricFetcher, jobID, jobVertexID);
+ execution, metricFetcher, jobID, jobVertexID, otherConcurrentAttempts);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
index 7093308bf3b..a188a19cb5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -99,25 +99,25 @@ public class SubtaskExecutionAttemptAccumulatorsHandler
List<ArchivedJson> archive = new ArrayList<>(16);
for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
for (AccessExecutionVertex subtask : task.getTaskVertices()) {
- ResponseBody curAttemptJson =
- createAccumulatorInfo(subtask.getCurrentExecutionAttempt());
- String curAttemptPath =
- getMessageHeaders()
- .getTargetRestEndpointURL()
- .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
- .replace(
- ':' + JobVertexIdPathParameter.KEY,
- task.getJobVertexId().toString())
- .replace(
- ':' + SubtaskIndexPathParameter.KEY,
- String.valueOf(subtask.getParallelSubtaskIndex()))
- .replace(
- ':' + SubtaskAttemptPathParameter.KEY,
- String.valueOf(
- subtask.getCurrentExecutionAttempt()
- .getAttemptNumber()));
-
- archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+ for (AccessExecution attempt : subtask.getCurrentExecutions()) {
+ ResponseBody curAttemptJson = createAccumulatorInfo(attempt);
+ String curAttemptPath =
+ getMessageHeaders()
+ .getTargetRestEndpointURL()
+ .replace(
+ ':' + JobIDPathParameter.KEY,
+ graph.getJobID().toString())
+ .replace(
+ ':' + JobVertexIdPathParameter.KEY,
+ task.getJobVertexId().toString())
+ .replace(
+ ':' + SubtaskIndexPathParameter.KEY,
+ String.valueOf(subtask.getParallelSubtaskIndex()))
+ .replace(
+ ':' + SubtaskAttemptPathParameter.KEY,
+ String.valueOf(attempt.getAttemptNumber()));
+ archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+ }
for (AccessExecution attempt :
subtask.getExecutionHistory().getHistoricalExecutions()) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
index 398a10c57b4..d4eaa00906e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
@@ -102,7 +102,7 @@ public class SubtaskExecutionAttemptDetailsHandler
final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
return SubtaskExecutionAttemptDetailsInfo.create(
- execution, metricFetcher, jobID, jobVertexID);
+ execution, metricFetcher, jobID, jobVertexID, null);
}
@Override
@@ -111,36 +111,38 @@ public class SubtaskExecutionAttemptDetailsHandler
List<ArchivedJson> archive = new ArrayList<>(16);
for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
for (AccessExecutionVertex subtask : task.getTaskVertices()) {
- ResponseBody curAttemptJson =
- SubtaskExecutionAttemptDetailsInfo.create(
- subtask.getCurrentExecutionAttempt(),
- null,
- graph.getJobID(),
- task.getJobVertexId());
- String curAttemptPath =
- getMessageHeaders()
- .getTargetRestEndpointURL()
- .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
- .replace(
- ':' + JobVertexIdPathParameter.KEY,
- task.getJobVertexId().toString())
- .replace(
- ':' + SubtaskIndexPathParameter.KEY,
- String.valueOf(subtask.getParallelSubtaskIndex()))
- .replace(
- ':' + SubtaskAttemptPathParameter.KEY,
- String.valueOf(
- subtask.getCurrentExecutionAttempt()
- .getAttemptNumber()));
-
- archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+ for (AccessExecution attempt : subtask.getCurrentExecutions()) {
+ ResponseBody curAttemptJson =
+ SubtaskExecutionAttemptDetailsInfo.create(
+ attempt, null, graph.getJobID(), task.getJobVertexId(), null);
+ String curAttemptPath =
+ getMessageHeaders()
+ .getTargetRestEndpointURL()
+ .replace(
+ ':' + JobIDPathParameter.KEY,
+ graph.getJobID().toString())
+ .replace(
+ ':' + JobVertexIdPathParameter.KEY,
+ task.getJobVertexId().toString())
+ .replace(
+ ':' + SubtaskIndexPathParameter.KEY,
+ String.valueOf(subtask.getParallelSubtaskIndex()))
+ .replace(
+ ':' + SubtaskAttemptPathParameter.KEY,
+ String.valueOf(attempt.getAttemptNumber()));
+ archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+ }
for (AccessExecution attempt :
subtask.getExecutionHistory().getHistoricalExecutions()) {
if (attempt != null) {
ResponseBody json =
SubtaskExecutionAttemptDetailsInfo.create(
- attempt, null, graph.getJobID(), task.getJobVertexId());
+ attempt,
+ null,
+ graph.getJobID(),
+ task.getJobVertexId(),
+ null);
String path =
getMessageHeaders()
.getTargetRestEndpointURL()
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
index 88c3cc851ad..a47ad6a46be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.job;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -75,23 +76,24 @@ public class SubtasksAllAccumulatorsHandler
new ArrayList<>();
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
- TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
- String locationString = location == null ? "(unassigned)" : location.getHostname();
+ for (AccessExecution execution : vertex.getCurrentExecutions()) {
+ TaskManagerLocation location = execution.getAssignedResourceLocation();
+ String locationString = location == null ? "(unassigned)" : location.getHostname();
- StringifiedAccumulatorResult[] accs =
- vertex.getCurrentExecutionAttempt().getUserAccumulatorsStringified();
- List<UserAccumulator> userAccumulators = new ArrayList<>(accs.length);
- for (StringifiedAccumulatorResult acc : accs) {
- userAccumulators.add(
- new UserAccumulator(acc.getName(), acc.getType(), acc.getValue()));
- }
+ StringifiedAccumulatorResult[] accs = execution.getUserAccumulatorsStringified();
+ List<UserAccumulator> userAccumulators = new ArrayList<>(accs.length);
+ for (StringifiedAccumulatorResult acc : accs) {
+ userAccumulators.add(
+ new UserAccumulator(acc.getName(), acc.getType(), acc.getValue()));
+ }
- subtaskAccumulatorsInfos.add(
- new SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo(
- vertex.getCurrentExecutionAttempt().getParallelSubtaskIndex(),
- vertex.getCurrentExecutionAttempt().getAttemptNumber(),
- locationString,
- userAccumulators));
+ subtaskAccumulatorsInfos.add(
+ new SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo(
+ execution.getParallelSubtaskIndex(),
+ execution.getAttemptNumber(),
+ locationString,
+ userAccumulators));
+ }
}
return new SubtasksAllAccumulatorsInfo(jobVertexId, parallelism, subtaskAccumulatorsInfos);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
index 32f04899810..ac9c8322c9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
@@ -101,7 +101,8 @@ public class SubtasksTimesHandler
int num = 0;
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
-
+ // Use one of the current execution attempts to represent the subtask, rather than
+ // adding times info of all attempts.
long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps();
ExecutionState status = vertex.getExecutionState();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
index 5415c297cbf..c475b6d7cd5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
@@ -121,14 +122,22 @@ public class JobVertexBackPressureInfo implements ResponseBody {
public static final class SubtaskBackPressureInfo {
public static final String FIELD_NAME_SUBTASK = "subtask";
+ public static final String FIELD_NAME_ATTEMPT_NUMBER = "attempt-number";
public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level";
public static final String FIELD_NAME_BACK_PRESSURED_RATIO = "ratio";
public static final String FIELD_NAME_IDLE_RATIO = "idleRatio";
public static final String FIELD_NAME_BUSY_RATIO = "busyRatio";
+ public static final String FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS =
+ "other-concurrent-attempts";
@JsonProperty(FIELD_NAME_SUBTASK)
private final int subtask;
+ @JsonProperty(FIELD_NAME_ATTEMPT_NUMBER)
+ @JsonInclude(Include.NON_NULL)
+ @Nullable
+ private final Integer attemptNumber;
+
@JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
private final VertexBackPressureLevel backpressureLevel;
@@ -141,18 +150,30 @@ public class JobVertexBackPressureInfo implements ResponseBody {
@JsonProperty(FIELD_NAME_BUSY_RATIO)
private final double busyRatio;
+ @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS)
+ @JsonInclude(Include.NON_EMPTY)
+ @Nullable
+ private final List<SubtaskBackPressureInfo> otherConcurrentAttempts;
+
+ // otherConcurrentAttempts and attemptNumber are Nullable since Jackson will assign null if
+ // the fields are absent while parsing
public SubtaskBackPressureInfo(
@JsonProperty(FIELD_NAME_SUBTASK) int subtask,
+ @JsonProperty(FIELD_NAME_ATTEMPT_NUMBER) @Nullable Integer attemptNumber,
@JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
VertexBackPressureLevel backpressureLevel,
@JsonProperty(FIELD_NAME_BACK_PRESSURED_RATIO) double backPressuredRatio,
@JsonProperty(FIELD_NAME_IDLE_RATIO) double idleRatio,
- @JsonProperty(FIELD_NAME_BUSY_RATIO) double busyRatio) {
+ @JsonProperty(FIELD_NAME_BUSY_RATIO) double busyRatio,
+ @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS) @Nullable
+ List<SubtaskBackPressureInfo> otherConcurrentAttempts) {
this.subtask = subtask;
+ this.attemptNumber = attemptNumber;
this.backpressureLevel = checkNotNull(backpressureLevel);
this.backPressuredRatio = backPressuredRatio;
this.idleRatio = idleRatio;
this.busyRatio = busyRatio;
+ this.otherConcurrentAttempts = otherConcurrentAttempts;
}
@Override
@@ -165,16 +186,24 @@ public class JobVertexBackPressureInfo implements ResponseBody {
}
SubtaskBackPressureInfo that = (SubtaskBackPressureInfo) o;
return subtask == that.subtask
+ && Objects.equals(attemptNumber, that.attemptNumber)
&& backPressuredRatio == that.backPressuredRatio
&& idleRatio == that.idleRatio
&& busyRatio == that.busyRatio
- && Objects.equals(backpressureLevel, that.backpressureLevel);
+ && Objects.equals(backpressureLevel, that.backpressureLevel)
+ && Objects.equals(otherConcurrentAttempts, that.otherConcurrentAttempts);
}
@Override
public int hashCode() {
return Objects.hash(
- subtask, backpressureLevel, backPressuredRatio, idleRatio, busyRatio);
+ subtask,
+ attemptNumber,
+ backpressureLevel,
+ backPressuredRatio,
+ idleRatio,
+ busyRatio,
+ otherConcurrentAttempts);
}
public int getSubtask() {
@@ -196,6 +225,16 @@ public class JobVertexBackPressureInfo implements ResponseBody {
public double getBusyRatio() {
return busyRatio;
}
+
+ @Nullable
+ public Integer getAttemptNumber() {
+ return attemptNumber;
+ }
+
+ @Nullable
+ public List<SubtaskBackPressureInfo> getOtherConcurrentAttempts() {
+ return otherConcurrentAttempts;
+ }
}
/** Status of vertex back-pressure. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
index 56577fb260b..1a8463ab559 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
@@ -30,12 +30,16 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.v3.oas.annotations.Hidden;
import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -66,6 +70,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
public static final String FIELD_NAME_STATUS_DURATION = "status-duration";
+ public static final String FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS = "other-concurrent-attempts";
+
@JsonProperty(FIELD_NAME_SUBTASK_INDEX)
private final int subtaskIndex;
@@ -100,7 +106,13 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
@JsonProperty(FIELD_NAME_STATUS_DURATION)
private final Map<ExecutionState, Long> statusDuration;
+ @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS)
+ @JsonInclude(Include.NON_EMPTY)
+ @Nullable
+ private final List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts;
+
@JsonCreator
+ // blocked is Nullable since Jackson will assign null if the field is absent while parsing
public SubtaskExecutionAttemptDetailsInfo(
@JsonProperty(FIELD_NAME_SUBTASK_INDEX) int subtaskIndex,
@JsonProperty(FIELD_NAME_STATUS) ExecutionState status,
@@ -111,7 +123,9 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
@JsonProperty(FIELD_NAME_DURATION) long duration,
@JsonProperty(FIELD_NAME_METRICS) IOMetricsInfo ioMetricsInfo,
@JsonProperty(FIELD_NAME_TASKMANAGER_ID) String taskmanagerId,
- @JsonProperty(FIELD_NAME_STATUS_DURATION) Map<ExecutionState, Long> statusDuration) {
+ @JsonProperty(FIELD_NAME_STATUS_DURATION) Map<ExecutionState, Long> statusDuration,
+ @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS) @Nullable
+ List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts) {
this.subtaskIndex = subtaskIndex;
this.status = Preconditions.checkNotNull(status);
@@ -124,6 +138,7 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
this.ioMetricsInfo = Preconditions.checkNotNull(ioMetricsInfo);
this.taskmanagerId = Preconditions.checkNotNull(taskmanagerId);
this.statusDuration = Preconditions.checkNotNull(statusDuration);
+ this.otherConcurrentAttempts = otherConcurrentAttempts;
}
public int getSubtaskIndex() {
@@ -174,51 +189,16 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
return taskmanagerId;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- SubtaskExecutionAttemptDetailsInfo that = (SubtaskExecutionAttemptDetailsInfo) o;
-
- return subtaskIndex == that.subtaskIndex
- && status == that.status
- && attempt == that.attempt
- && Objects.equals(host, that.host)
- && startTime == that.startTime
- && startTimeCompatible == that.startTimeCompatible
- && endTime == that.endTime
- && duration == that.duration
- && Objects.equals(ioMetricsInfo, that.ioMetricsInfo)
- && Objects.equals(taskmanagerId, that.taskmanagerId)
- && Objects.equals(statusDuration, that.statusDuration);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(
- subtaskIndex,
- status,
- attempt,
- host,
- startTime,
- startTimeCompatible,
- endTime,
- duration,
- ioMetricsInfo,
- taskmanagerId,
- statusDuration);
+ public List<SubtaskExecutionAttemptDetailsInfo> getOtherConcurrentAttempts() {
+ return otherConcurrentAttempts == null ? new ArrayList<>() : otherConcurrentAttempts;
}
public static SubtaskExecutionAttemptDetailsInfo create(
AccessExecution execution,
@Nullable MetricFetcher metricFetcher,
JobID jobID,
- JobVertexID jobVertexID) {
+ JobVertexID jobVertexID,
+ @Nullable List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts) {
final ExecutionState status = execution.getState();
final long now = System.currentTimeMillis();
@@ -261,6 +241,49 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
duration,
ioMetricsInfo,
taskmanagerId,
- getExecutionStateDuration(execution));
+ getExecutionStateDuration(execution),
+ otherConcurrentAttempts);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SubtaskExecutionAttemptDetailsInfo that = (SubtaskExecutionAttemptDetailsInfo) o;
+
+ return subtaskIndex == that.subtaskIndex
+ && status == that.status
+ && attempt == that.attempt
+ && Objects.equals(host, that.host)
+ && startTime == that.startTime
+ && startTimeCompatible == that.startTimeCompatible
+ && endTime == that.endTime
+ && duration == that.duration
+ && Objects.equals(ioMetricsInfo, that.ioMetricsInfo)
+ && Objects.equals(taskmanagerId, that.taskmanagerId)
+ && Objects.equals(statusDuration, that.statusDuration)
+ && Objects.equals(otherConcurrentAttempts, that.otherConcurrentAttempts);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ subtaskIndex,
+ status,
+ attempt,
+ host,
+ startTime,
+ startTimeCompatible,
+ endTime,
+ duration,
+ ioMetricsInfo,
+ taskmanagerId,
+ statusDuration,
+ otherConcurrentAttempts);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
index 76ec84509ec..45b469632b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -257,18 +258,19 @@ public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVert
executionVertex.getExecutionState());
continue;
}
- TaskManagerLocation tmLocation = executionVertex.getCurrentAssignedResourceLocation();
- if (tmLocation == null) {
- LOG.trace("ExecutionVertex {} is currently not assigned", executionVertex);
- continue;
- }
- Set<ExecutionAttemptID> groupedAttemptIds =
- executionAttemptsByLocation.getOrDefault(tmLocation, new HashSet<>());
+ for (AccessExecution execution : executionVertex.getCurrentExecutions()) {
+ TaskManagerLocation tmLocation = execution.getAssignedResourceLocation();
+ if (tmLocation == null) {
+ LOG.trace("ExecutionVertex {} is currently not assigned", executionVertex);
+ continue;
+ }
+ Set<ExecutionAttemptID> groupedAttemptIds =
+ executionAttemptsByLocation.getOrDefault(tmLocation, new HashSet<>());
- ExecutionAttemptID attemptId =
- executionVertex.getCurrentExecutionAttempt().getAttemptId();
- groupedAttemptIds.add(attemptId);
- executionAttemptsByLocation.put(tmLocation, ImmutableSet.copyOf(groupedAttemptIds));
+ ExecutionAttemptID attemptId = execution.getAttemptId();
+ groupedAttemptIds.add(attemptId);
+ executionAttemptsByLocation.put(tmLocation, ImmutableSet.copyOf(groupedAttemptIds));
+ }
}
return executionAttemptsByLocation.entrySet().stream()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
index 85bf3a44cd5..93714132681 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
@@ -46,6 +46,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@@ -141,6 +142,63 @@ class JobVertexBackPressureHandlerTest {
});
}
+ private static Collection<MetricDump> getMultipleAttemptsMetricDumps() {
+ Collection<MetricDump> dumps = new ArrayList<>();
+ TaskQueryScopeInfo task0 =
+ new TaskQueryScopeInfo(
+ TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+ TEST_JOB_VERTEX_ID.toString(),
+ 0,
+ 0);
+ dumps.add(new GaugeDump(task0, MetricNames.TASK_BACK_PRESSURED_TIME, "1000"));
+ dumps.add(new GaugeDump(task0, MetricNames.TASK_IDLE_TIME, "0"));
+ dumps.add(new GaugeDump(task0, MetricNames.TASK_BUSY_TIME, "0"));
+
+ TaskQueryScopeInfo speculativeTask0 =
+ new TaskQueryScopeInfo(
+ TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+ TEST_JOB_VERTEX_ID.toString(),
+ 0,
+ 1);
+ dumps.add(new GaugeDump(speculativeTask0, MetricNames.TASK_BACK_PRESSURED_TIME, "200"));
+ dumps.add(new GaugeDump(speculativeTask0, MetricNames.TASK_IDLE_TIME, "100"));
+ dumps.add(new GaugeDump(speculativeTask0, MetricNames.TASK_BUSY_TIME, "800"));
+
+ TaskQueryScopeInfo task1 =
+ new TaskQueryScopeInfo(
+ TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+ TEST_JOB_VERTEX_ID.toString(),
+ 1,
+ 0);
+ dumps.add(new GaugeDump(task1, MetricNames.TASK_BACK_PRESSURED_TIME, "500"));
+ dumps.add(new GaugeDump(task1, MetricNames.TASK_IDLE_TIME, "100"));
+ dumps.add(new GaugeDump(task1, MetricNames.TASK_BUSY_TIME, "900"));
+
+ TaskQueryScopeInfo speculativeTask1 =
+ new TaskQueryScopeInfo(
+ TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+ TEST_JOB_VERTEX_ID.toString(),
+ 1,
+ 1);
+ dumps.add(new GaugeDump(speculativeTask1, MetricNames.TASK_BACK_PRESSURED_TIME, "900"));
+ dumps.add(new GaugeDump(speculativeTask1, MetricNames.TASK_IDLE_TIME, "0"));
+ dumps.add(new GaugeDump(speculativeTask1, MetricNames.TASK_BUSY_TIME, "100"));
+
+ // missing task2
+
+ TaskQueryScopeInfo task3 =
+ new TaskQueryScopeInfo(
+ TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+ TEST_JOB_VERTEX_ID.toString(),
+ 3,
+ 0);
+ dumps.add(new GaugeDump(task3, MetricNames.TASK_BACK_PRESSURED_TIME, "100"));
+ dumps.add(new GaugeDump(task3, MetricNames.TASK_IDLE_TIME, "200"));
+ dumps.add(new GaugeDump(task3, MetricNames.TASK_BUSY_TIME, "700"));
+
+ return dumps;
+ }
+
@Test
void testGetBackPressure() throws Exception {
final Map<String, String> pathParameters = new HashMap<>();
@@ -220,4 +278,153 @@ class JobVertexBackPressureHandlerTest {
assertThat(jobVertexBackPressureInfo.getStatus())
.isEqualTo(VertexBackPressureStatus.DEPRECATED);
}
+
+ @Test
+ void testGetBackPressureFromMultipleCurrentAttempts() throws Exception {
+ MetricStore multipleAttemptsMetricStore = new MetricStore();
+ for (MetricDump metricDump : getMultipleAttemptsMetricDumps()) {
+ multipleAttemptsMetricStore.add(metricDump);
+ }
+ // Update currentExecutionAttempts directly without JobDetails.
+ Map<Integer, Integer> currentExecutionAttempts = new HashMap<>();
+ currentExecutionAttempts.put(0, 1);
+ currentExecutionAttempts.put(1, 0);
+ multipleAttemptsMetricStore
+ .getCurrentExecutionAttempts()
+ .put(
+ TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+ Collections.singletonMap(
+ TEST_JOB_VERTEX_ID.toString(), currentExecutionAttempts));
+
+ JobVertexBackPressureHandler jobVertexBackPressureHandler =
+ new JobVertexBackPressureHandler(
+ () -> CompletableFuture.completedFuture(restfulGateway),
+ Time.seconds(10),
+ Collections.emptyMap(),
+ JobVertexBackPressureHeaders.getInstance(),
+ new MetricFetcher() {
+ private long updateCount = 0;
+
+ @Override
+ public MetricStore getMetricStore() {
+ return multipleAttemptsMetricStore;
+ }
+
+ @Override
+ public void update() {
+ updateCount++;
+ }
+
+ @Override
+ public long getLastUpdateTime() {
+ return updateCount;
+ }
+ });
+
+ final Map<String, String> pathParameters = new HashMap<>();
+ pathParameters.put(
+ JobIDPathParameter.KEY, TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString());
+ pathParameters.put(JobVertexIdPathParameter.KEY, TEST_JOB_VERTEX_ID.toString());
+
+ final HandlerRequest<EmptyRequestBody> request =
+ HandlerRequest.resolveParametersAndCreate(
+ EmptyRequestBody.getInstance(),
+ new JobVertexMessageParameters(),
+ pathParameters,
+ Collections.emptyMap(),
+ Collections.emptyList());
+
+ final CompletableFuture<JobVertexBackPressureInfo>
+ jobVertexBackPressureInfoCompletableFuture =
+ jobVertexBackPressureHandler.handleRequest(request, restfulGateway);
+ final JobVertexBackPressureInfo jobVertexBackPressureInfo =
+ jobVertexBackPressureInfoCompletableFuture.get();
+
+ assertThat(jobVertexBackPressureInfo.getStatus()).isEqualTo(VertexBackPressureStatus.OK);
+ assertThat(jobVertexBackPressureInfo.getBackpressureLevel()).isEqualTo(LOW);
+
+ assertThat(
+ jobVertexBackPressureInfo.getSubtasks().stream()
+ .map(SubtaskBackPressureInfo::getAttemptNumber)
+ .collect(Collectors.toList()))
+ .containsExactly(1, 0, null);
+ assertThat(
+ jobVertexBackPressureInfo.getSubtasks().stream()
+ .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+ .filter(Objects::nonNull)
+ .flatMap(Collection::stream)
+ .map(SubtaskBackPressureInfo::getAttemptNumber)
+ .collect(Collectors.toList()))
+ .containsExactly(0, 1);
+
+ assertThat(
+ jobVertexBackPressureInfo.getSubtasks().stream()
+ .map(SubtaskBackPressureInfo::getBackPressuredRatio)
+ .collect(Collectors.toList()))
+ .containsExactly(0.2, 0.5, 0.1);
+ assertThat(
+ jobVertexBackPressureInfo.getSubtasks().stream()
+ .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+ .filter(Objects::nonNull)
+ .flatMap(Collection::stream)
+ .map(SubtaskBackPressureInfo::getBackPressuredRatio)
+ .collect(Collectors.toList()))
+ .containsExactly(1.0, 0.9);
+
+ assertThat(
+ jobVertexBackPressureInfo.getSubtasks().stream()
+ .map(SubtaskBackPressureInfo::getIdleRatio)
+ .collect(Collectors.toList()))
+ .containsExactly(0.1, 0.1, 0.2);
+ assertThat(
+ jobVertexBackPressureInfo.getSubtasks().stream()
+ .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+ .filter(Objects::nonNull)
+ .flatMap(Collection::stream)
+ .map(SubtaskBackPressureInfo::getIdleRatio)
+ .collect(Collectors.toList()))
+ .containsExactly(0.0, 0.0);
+
+ assertThat(
+ jobVertexBackPressureInfo.getSubtasks().stream()
+ .map(SubtaskBackPressureInfo::getBusyRatio)
+ .collect(Collectors.toList()))
+ .containsExactly(0.8, 0.9, 0.7);
+ assertThat(
+ jobVertexBackPressureInfo.getSubtasks().stream()
+ .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+ .filter(Objects::nonNull)
+ .flatMap(Collection::stream)
+ .map(SubtaskBackPressureInfo::getBusyRatio)
+ .collect(Collectors.toList()))
+ .containsExactly(0.0, 0.1);
+
+ assertThat(
+ jobVertexBackPressureInfo.getSubtasks().stream()
+ .map(SubtaskBackPressureInfo::getBackpressureLevel)
+ .collect(Collectors.toList()))
+ .containsExactly(LOW, LOW, OK);
+ assertThat(
+ jobVertexBackPressureInfo.getSubtasks().stream()
+ .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+ .filter(Objects::nonNull)
+ .flatMap(Collection::stream)
+ .map(SubtaskBackPressureInfo::getBackpressureLevel)
+ .collect(Collectors.toList()))
+ .containsExactly(HIGH, HIGH);
+
+ assertThat(
+ jobVertexBackPressureInfo.getSubtasks().stream()
+ .map(SubtaskBackPressureInfo::getSubtask)
+ .collect(Collectors.toList()))
+ .containsExactly(0, 1, 3);
+ assertThat(
+ jobVertexBackPressureInfo.getSubtasks().stream()
+ .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+ .filter(Objects::nonNull)
+ .flatMap(Collection::stream)
+ .map(SubtaskBackPressureInfo::getSubtask)
+ .collect(Collectors.toList()))
+ .containsExactly(0, 1);
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index 4c267329a74..70f46c1d302 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -192,7 +192,8 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
finishedTs - deployingTs,
ioMetricsInfo,
assignedResourceLocation.getResourceID().getResourceIdString(),
- statusDuration);
+ statusDuration,
+ null);
assertEquals(expectedDetailsInfo, detailsInfo);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index a044f3f87ff..40f2286d722 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -195,7 +195,8 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
-1L,
ioMetricsInfo,
"(unassigned)",
- statusDuration);
+ statusDuration,
+ null);
assertEquals(expectedDetailsInfo, detailsInfo);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/AggregatedTaskDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/AggregatedTaskDetailsInfoTest.java
index 3490a05d029..df3e3fef220 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/AggregatedTaskDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/AggregatedTaskDetailsInfoTest.java
@@ -77,7 +77,8 @@ public class AggregatedTaskDetailsInfoTest
Math.abs(random.nextLong()),
ioMetricsInfo,
"taskmanagerId",
- statusDuration)));
+ statusDuration,
+ null)));
}
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfoTest.java
index 0dc78e48a54..566f668837a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfoTest.java
@@ -34,13 +34,31 @@ public class JobVertexBackPressureInfoTest
List<JobVertexBackPressureInfo.SubtaskBackPressureInfo> subtaskList = new ArrayList<>();
subtaskList.add(
new JobVertexBackPressureInfo.SubtaskBackPressureInfo(
- 0, JobVertexBackPressureInfo.VertexBackPressureLevel.LOW, 0.1, 0.5, 0.4));
+ 0,
+ 0,
+ JobVertexBackPressureInfo.VertexBackPressureLevel.LOW,
+ 0.1,
+ 0.5,
+ 0.4,
+ null));
subtaskList.add(
new JobVertexBackPressureInfo.SubtaskBackPressureInfo(
- 1, JobVertexBackPressureInfo.VertexBackPressureLevel.OK, 0.4, 0.3, 0.3));
+ 1,
+ 0,
+ JobVertexBackPressureInfo.VertexBackPressureLevel.OK,
+ 0.4,
+ 0.3,
+ 0.3,
+ null));
subtaskList.add(
new JobVertexBackPressureInfo.SubtaskBackPressureInfo(
- 2, JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH, 0.9, 0.0, 0.1));
+ 2,
+ 0,
+ JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH,
+ 0.9,
+ 0.0,
+ 0.1,
+ null));
return new JobVertexBackPressureInfo(
JobVertexBackPressureInfo.VertexBackPressureStatus.OK,
JobVertexBackPressureInfo.VertexBackPressureLevel.LOW,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
index 11682f37ceb..00b12f7c68d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetails
import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -71,11 +72,12 @@ public class JobVertexDetailsInfoTest
1L,
jobVertexMetrics,
"taskmanagerId1",
- statusDuration));
+ statusDuration,
+ null));
vertexTaskDetailList.add(
new SubtaskExecutionAttemptDetailsInfo(
1,
- ExecutionState.FAILED,
+ ExecutionState.RUNNING,
random.nextInt(),
"local2",
System.currentTimeMillis(),
@@ -83,7 +85,20 @@ public class JobVertexDetailsInfoTest
1L,
jobVertexMetrics,
"taskmanagerId2",
- statusDuration));
+ statusDuration,
+ Collections.singletonList(
+ new SubtaskExecutionAttemptDetailsInfo(
+ 1,
+ ExecutionState.FAILED,
+ random.nextInt(),
+ "local2",
+ System.currentTimeMillis(),
+ System.currentTimeMillis(),
+ 1L,
+ jobVertexMetrics,
+ "taskmanagerId2",
+ statusDuration,
+ null))));
vertexTaskDetailList.add(
new SubtaskExecutionAttemptDetailsInfo(
2,
@@ -95,7 +110,8 @@ public class JobVertexDetailsInfoTest
1L,
jobVertexMetrics,
"taskmanagerId3",
- statusDuration));
+ statusDuration,
+ null));
int parallelism = 1 + (random.nextInt() / 3);
return new JobVertexDetailsInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
index 48158c05b56..c4110811e0c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
@@ -70,6 +70,7 @@ public class SubtaskExecutionAttemptDetailsInfoTest
Math.abs(random.nextLong()),
ioMetricsInfo,
"taskmanagerId",
- statusDuration);
+ statusDuration,
+ null);
}
}