You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2023/01/05 11:24:37 UTC
[flink] 02/03: [FLINK-30185][rest] Add the subtask index parameter for flame graph handler
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e07aab138578d1f65f2d7260bd33391b15b2012a
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sat Dec 17 22:26:33 2022 +0800
[FLINK-30185][rest] Add the subtask index parameter for flame graph handler
---
.../src/test/resources/rest_api_v1.snapshot | 3 +
.../handler/job/JobVertexFlameGraphHandler.java | 92 +++++++--
.../messages/FlameGraphTypeQueryParameter.java | 4 +-
.../rest/messages/JobVertexFlameGraphHeaders.java | 8 +-
.../messages/JobVertexFlameGraphParameters.java | 7 +-
...meters.java => SubtaskIndexQueryParameter.java} | 31 ++-
.../runtime/webmonitor/WebMonitorEndpoint.java | 4 +-
.../threadinfo/JobVertexThreadInfoTracker.java | 10 +-
.../JobVertexThreadInfoTrackerBuilder.java | 6 +-
.../threadinfo/ThreadInfoRequestCoordinator.java | 11 +-
...VertexFlameGraph.java => VertexFlameGraph.java} | 18 +-
...phFactory.java => VertexFlameGraphFactory.java} | 26 +--
...adInfoStats.java => VertexThreadInfoStats.java} | 9 +-
.../job/JobVertexFlameGraphHandlerTest.java | 216 +++++++++++++++++++++
.../threadinfo/JobVertexThreadInfoTrackerTest.java | 67 +++----
.../ThreadInfoRequestCoordinatorTest.java | 20 +-
16 files changed, 406 insertions(+), 126 deletions(-)
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 9f1a3bec092..de462b9f204 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -2745,6 +2745,9 @@
"queryParameters" : [ {
"key" : "type",
"mandatory" : false
+ }, {
+ "key" : "subtaskindex",
+ "mandatory" : false
} ]
},
"request" : {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandler.java
index eb23f0f1436..516ab8bce91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.job;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -29,28 +30,32 @@ import org.apache.flink.runtime.rest.messages.FlameGraphTypeQueryParameter;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexFlameGraphHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexFlameGraphParameters;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexQueryParameter;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexFlameGraph;
-import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexFlameGraphFactory;
-import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoStats;
-import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTracker;
+import org.apache.flink.runtime.webmonitor.stats.JobVertexStatsTracker;
+import org.apache.flink.runtime.webmonitor.threadinfo.VertexFlameGraph;
+import org.apache.flink.runtime.webmonitor.threadinfo.VertexFlameGraphFactory;
+import org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoStats;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/** Request handler for the job vertex Flame Graph. */
public class JobVertexFlameGraphHandler
- extends AbstractJobVertexHandler<JobVertexFlameGraph, JobVertexFlameGraphParameters> {
+ extends AbstractJobVertexHandler<VertexFlameGraph, JobVertexFlameGraphParameters> {
- private final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> threadInfoOperatorTracker;
+ private final JobVertexStatsTracker<VertexThreadInfoStats> threadInfoOperatorTracker;
public JobVertexFlameGraphHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -58,7 +63,7 @@ public class JobVertexFlameGraphHandler
Map<String, String> responseHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor,
- JobVertexThreadInfoTracker<JobVertexThreadInfoStats> threadInfoOperatorTracker) {
+ JobVertexStatsTracker<VertexThreadInfoStats> threadInfoOperatorTracker) {
super(
leaderRetriever,
timeout,
@@ -70,34 +75,40 @@ public class JobVertexFlameGraphHandler
}
@Override
- protected JobVertexFlameGraph handleRequest(
+ protected VertexFlameGraph handleRequest(
HandlerRequest<EmptyRequestBody> request, AccessExecutionJobVertex jobVertex)
throws RestHandlerException {
- if (jobVertex.getAggregateState().isTerminal()) {
- return JobVertexFlameGraph.terminated();
+ @Nullable Integer subtaskIndex = getSubtaskIndex(request, jobVertex);
+ if (isTerminated(jobVertex, subtaskIndex)) {
+ return VertexFlameGraph.terminated();
}
- final Optional<JobVertexThreadInfoStats> threadInfoSample =
+ Optional<VertexThreadInfoStats> threadInfoSample =
threadInfoOperatorTracker.getVertexStats(
request.getPathParameter(JobIDPathParameter.class), jobVertex);
+ if (subtaskIndex != null) {
+ threadInfoSample =
+ threadInfoSample.map(generateThreadInfoStatsForSubtask(subtaskIndex));
+ }
+
final FlameGraphTypeQueryParameter.Type flameGraphType = getFlameGraphType(request);
- final Optional<JobVertexFlameGraph> operatorFlameGraph;
+ final Optional<VertexFlameGraph> operatorFlameGraph;
switch (flameGraphType) {
case FULL:
operatorFlameGraph =
- threadInfoSample.map(JobVertexFlameGraphFactory::createFullFlameGraphFrom);
+ threadInfoSample.map(VertexFlameGraphFactory::createFullFlameGraphFrom);
break;
case ON_CPU:
operatorFlameGraph =
- threadInfoSample.map(JobVertexFlameGraphFactory::createOnCpuFlameGraph);
+ threadInfoSample.map(VertexFlameGraphFactory::createOnCpuFlameGraph);
break;
case OFF_CPU:
operatorFlameGraph =
- threadInfoSample.map(JobVertexFlameGraphFactory::createOffCpuFlameGraph);
+ threadInfoSample.map(VertexFlameGraphFactory::createOffCpuFlameGraph);
break;
default:
throw new RestHandlerException(
@@ -105,7 +116,28 @@ public class JobVertexFlameGraphHandler
HttpResponseStatus.BAD_REQUEST);
}
- return operatorFlameGraph.orElse(JobVertexFlameGraph.waiting());
+ return operatorFlameGraph.orElse(VertexFlameGraph.waiting());
+ }
+
+ private Function<VertexThreadInfoStats, VertexThreadInfoStats>
+ generateThreadInfoStatsForSubtask(Integer subtaskIndex) {
+ return stats ->
+ new VertexThreadInfoStats(
+ stats.getRequestId(),
+ stats.getStartTime(),
+ stats.getEndTime(),
+ stats.getSamplesBySubtask().entrySet().stream()
+ .filter(entry -> entry.getKey().getSubtaskIndex() == subtaskIndex)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
+ }
+
+ private boolean isTerminated(
+ AccessExecutionJobVertex jobVertex, @Nullable Integer subtaskIndex) {
+ if (subtaskIndex == null) {
+ return jobVertex.getAggregateState().isTerminal();
+ }
+ AccessExecutionVertex executionVertex = jobVertex.getTaskVertices()[subtaskIndex];
+ return executionVertex.getExecutionState().isTerminal();
}
private static FlameGraphTypeQueryParameter.Type getFlameGraphType(HandlerRequest<?> request) {
@@ -114,9 +146,27 @@ public class JobVertexFlameGraphHandler
if (flameGraphTypeParameter.isEmpty()) {
return FlameGraphTypeQueryParameter.Type.FULL;
- } else {
- return flameGraphTypeParameter.get(0);
}
+ return flameGraphTypeParameter.get(0);
+ }
+
+ @Nullable
+ private static Integer getSubtaskIndex(
+ HandlerRequest<?> request, AccessExecutionJobVertex jobVertex)
+ throws RestHandlerException {
+ final List<Integer> subtaskIndexParameter =
+ request.getQueryParameter(SubtaskIndexQueryParameter.class);
+
+ if (subtaskIndexParameter.isEmpty()) {
+ return null;
+ }
+ int subtaskIndex = subtaskIndexParameter.get(0);
+ if (subtaskIndex >= jobVertex.getTaskVertices().length || subtaskIndex < 0) {
+ throw new RestHandlerException(
+ "Invalid subtask index for vertex " + jobVertex.getJobVertexId(),
+ HttpResponseStatus.NOT_FOUND);
+ }
+ return subtaskIndex;
}
@Override
@@ -135,7 +185,7 @@ public class JobVertexFlameGraphHandler
extends AbstractRestHandler<
RestfulGateway,
EmptyRequestBody,
- JobVertexFlameGraph,
+ VertexFlameGraph,
JobVertexFlameGraphParameters> {
protected DisabledJobVertexFlameGraphHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -149,10 +199,10 @@ public class JobVertexFlameGraphHandler
}
@Override
- protected CompletableFuture<JobVertexFlameGraph> handleRequest(
+ protected CompletableFuture<VertexFlameGraph> handleRequest(
@Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway)
throws RestHandlerException {
- return CompletableFuture.completedFuture(JobVertexFlameGraph.disabled());
+ return CompletableFuture.completedFuture(VertexFlameGraph.disabled());
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/FlameGraphTypeQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/FlameGraphTypeQueryParameter.java
index ae918fe0fd4..a0625394469 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/FlameGraphTypeQueryParameter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/FlameGraphTypeQueryParameter.java
@@ -24,10 +24,10 @@ import java.util.Arrays;
public class FlameGraphTypeQueryParameter
extends MessageQueryParameter<FlameGraphTypeQueryParameter.Type> {
- private static final String key = "type";
+ public static final String KEY = "type";
public FlameGraphTypeQueryParameter() {
- super(key, MessageParameterRequisiteness.OPTIONAL);
+ super(KEY, MessageParameterRequisiteness.OPTIONAL);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphHeaders.java
index 401c2202a03..e605d2e5f40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphHeaders.java
@@ -20,14 +20,14 @@ package org.apache.flink.runtime.rest.messages;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.job.JobVertexFlameGraphHandler;
-import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexFlameGraph;
+import org.apache.flink.runtime.webmonitor.threadinfo.VertexFlameGraph;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
/** Message headers for the {@link JobVertexFlameGraphHandler}. */
public class JobVertexFlameGraphHeaders
implements RuntimeMessageHeaders<
- EmptyRequestBody, JobVertexFlameGraph, JobVertexFlameGraphParameters> {
+ EmptyRequestBody, VertexFlameGraph, JobVertexFlameGraphParameters> {
private static final JobVertexFlameGraphHeaders INSTANCE = new JobVertexFlameGraphHeaders();
@@ -44,8 +44,8 @@ public class JobVertexFlameGraphHeaders
}
@Override
- public Class<JobVertexFlameGraph> getResponseClass() {
- return JobVertexFlameGraph.class;
+ public Class<VertexFlameGraph> getResponseClass() {
+ return VertexFlameGraph.class;
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphParameters.java
index 8b5bb945dd0..4fb34f7c3f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphParameters.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.rest.messages;
+import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
/** Message parameters for job vertex Flame Graph REST handler. */
public class JobVertexFlameGraphParameters extends JobVertexMessageParameters {
@@ -27,8 +27,11 @@ public class JobVertexFlameGraphParameters extends JobVertexMessageParameters {
public final FlameGraphTypeQueryParameter flameGraphTypeQueryParameter =
new FlameGraphTypeQueryParameter();
+ public final SubtaskIndexQueryParameter subtaskIndexQueryParameter =
+ new SubtaskIndexQueryParameter();
+
@Override
public Collection<MessageQueryParameter<?>> getQueryParameters() {
- return Collections.singleton(flameGraphTypeQueryParameter);
+ return Arrays.asList(flameGraphTypeQueryParameter, subtaskIndexQueryParameter);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexQueryParameter.java
similarity index 50%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphParameters.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexQueryParameter.java
index 8b5bb945dd0..08c1d0e3ff7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexQueryParameter.java
@@ -18,17 +18,32 @@
package org.apache.flink.runtime.rest.messages;
-import java.util.Collection;
-import java.util.Collections;
+/** Query parameter specifying the index of a subtask. */
+public class SubtaskIndexQueryParameter extends MessageQueryParameter<Integer> {
-/** Message parameters for job vertex Flame Graph REST handler. */
-public class JobVertexFlameGraphParameters extends JobVertexMessageParameters {
+ public static final String KEY = "subtaskindex";
- public final FlameGraphTypeQueryParameter flameGraphTypeQueryParameter =
- new FlameGraphTypeQueryParameter();
+ public SubtaskIndexQueryParameter() {
+ super(KEY, MessageParameterRequisiteness.OPTIONAL);
+ }
+
+ @Override
+ public Integer convertStringToValue(String value) throws ConversionException {
+ final int subtaskIndex = Integer.parseInt(value);
+ if (subtaskIndex >= 0) {
+ return subtaskIndex;
+ } else {
+ throw new ConversionException("subtaskindex must be positive, was: " + subtaskIndex);
+ }
+ }
+
+ @Override
+ public String convertValueToString(Integer value) {
+ return value.toString();
+ }
@Override
- public Collection<MessageQueryParameter<?>> getQueryParameters() {
- return Collections.singleton(flameGraphTypeQueryParameter);
+ public String getDescription() {
+ return "Positive integer value that identifies a subtask.";
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 259495a3f33..1ab048995aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -148,10 +148,10 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoStats;
import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTracker;
import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTrackerBuilder;
import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoRequestCoordinator;
+import org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoStats;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
@@ -241,7 +241,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
}
- private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> initializeThreadInfoTracker(
+ private JobVertexThreadInfoTracker<VertexThreadInfoStats> initializeThreadInfoTracker(
ScheduledExecutorService executor) {
final Duration akkaTimeout = clusterConfiguration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
index 6933655c0b4..95350f72ae9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
@@ -77,7 +77,7 @@ public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVert
@GuardedBy("lock")
private final ThreadInfoRequestCoordinator coordinator;
- private final Function<JobVertexThreadInfoStats, T> createStatsFn;
+ private final Function<VertexThreadInfoStats, T> createStatsFn;
private final ExecutorService executor;
@@ -108,7 +108,7 @@ public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVert
JobVertexThreadInfoTracker(
ThreadInfoRequestCoordinator coordinator,
GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
- Function<JobVertexThreadInfoStats, T> createStatsFn,
+ Function<VertexThreadInfoStats, T> createStatsFn,
ScheduledExecutorService executor,
Duration cleanUpInterval,
int numSamples,
@@ -193,7 +193,7 @@ public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVert
final CompletableFuture<ResourceManagerGateway> gatewayFuture =
resourceManagerGatewayRetriever.getFuture();
- CompletableFuture<JobVertexThreadInfoStats> sample =
+ CompletableFuture<VertexThreadInfoStats> sample =
gatewayFuture.thenCompose(
(ResourceManagerGateway resourceManagerGateway) ->
coordinator.triggerThreadInfoRequest(
@@ -334,7 +334,7 @@ public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVert
/** Callback on completed thread info sample. */
private class ThreadInfoSampleCompletionCallback
- implements BiConsumer<JobVertexThreadInfoStats, Throwable> {
+ implements BiConsumer<VertexThreadInfoStats, Throwable> {
private final Key key;
private final AccessExecutionJobVertex vertex;
@@ -345,7 +345,7 @@ public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVert
}
@Override
- public void accept(JobVertexThreadInfoStats threadInfoStats, Throwable throwable) {
+ public void accept(VertexThreadInfoStats threadInfoStats, Throwable throwable) {
synchronized (lock) {
try {
if (shutDown) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerBuilder.java
index 86e15334a05..530654fbcda 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerBuilder.java
@@ -42,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
public class JobVertexThreadInfoTrackerBuilder<T extends Statistics> {
private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
- private final Function<JobVertexThreadInfoStats, T> createStatsFn;
+ private final Function<VertexThreadInfoStats, T> createStatsFn;
private final ScheduledExecutorService executor;
private final Time restTimeout;
@@ -56,7 +56,7 @@ public class JobVertexThreadInfoTrackerBuilder<T extends Statistics> {
JobVertexThreadInfoTrackerBuilder(
GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
- Function<JobVertexThreadInfoStats, T> createStatsFn,
+ Function<VertexThreadInfoStats, T> createStatsFn,
ScheduledExecutorService executor,
Time restTimeout) {
this.resourceManagerGatewayRetriever = resourceManagerGatewayRetriever;
@@ -188,7 +188,7 @@ public class JobVertexThreadInfoTrackerBuilder<T extends Statistics> {
*/
public static <T extends Statistics> JobVertexThreadInfoTrackerBuilder<T> newBuilder(
GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
- Function<JobVertexThreadInfoStats, T> createStatsFn,
+ Function<VertexThreadInfoStats, T> createStatsFn,
ScheduledExecutorService executor,
Time restTimeout) {
return new JobVertexThreadInfoTrackerBuilder<>(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java
index eb0929124d5..8f080f41f8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java
@@ -42,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/** A coordinator for triggering and collecting thread info stats of running job vertex subtasks. */
public class ThreadInfoRequestCoordinator
extends TaskStatsRequestCoordinator<
- Map<ExecutionAttemptID, Collection<ThreadInfoSample>>, JobVertexThreadInfoStats> {
+ Map<ExecutionAttemptID, Collection<ThreadInfoSample>>, VertexThreadInfoStats> {
/**
* Creates a new coordinator for the job.
@@ -68,7 +68,7 @@ public class ThreadInfoRequestCoordinator
* samples.
* @return A future of the completed thread info stats.
*/
- public CompletableFuture<JobVertexThreadInfoStats> triggerThreadInfoRequest(
+ public CompletableFuture<VertexThreadInfoStats> triggerThreadInfoRequest(
Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>>
executionsWithGateways,
int numSamples,
@@ -161,8 +161,7 @@ public class ThreadInfoRequestCoordinator
private static class PendingThreadInfoRequest
extends PendingStatsRequest<
- Map<ExecutionAttemptID, Collection<ThreadInfoSample>>,
- JobVertexThreadInfoStats> {
+ Map<ExecutionAttemptID, Collection<ThreadInfoSample>>, VertexThreadInfoStats> {
PendingThreadInfoRequest(
int requestId, Collection<? extends Set<ExecutionAttemptID>> tasksToCollect) {
@@ -170,13 +169,13 @@ public class ThreadInfoRequestCoordinator
}
@Override
- protected JobVertexThreadInfoStats assembleCompleteStats(long endTime) {
+ protected VertexThreadInfoStats assembleCompleteStats(long endTime) {
HashMap<ExecutionAttemptID, Collection<ThreadInfoSample>> samples = new HashMap<>();
for (Map<ExecutionAttemptID, Collection<ThreadInfoSample>> map :
statsResultByTaskGroup.values()) {
samples.putAll(map);
}
- return new JobVertexThreadInfoStats(requestId, startTime, endTime, samples);
+ return new VertexThreadInfoStats(requestId, startTime, endTime, samples);
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexFlameGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraph.java
similarity index 90%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexFlameGraph.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraph.java
index 96fe0a21492..2bd160e186c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexFlameGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraph.java
@@ -28,12 +28,12 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
import java.util.List;
/**
- * Flame Graph representation for a job vertex.
+ * Flame Graph representation for a job vertex or an execution vertex.
*
* <p>Statistics are gathered by sampling stack traces of running tasks.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
-public class JobVertexFlameGraph implements ResponseBody {
+public class VertexFlameGraph implements ResponseBody {
private static final String FIELD_NAME_END_TIMESTAMP = "endTimestamp";
private static final String FIELD_NAME_DATA = "data";
@@ -46,7 +46,7 @@ public class JobVertexFlameGraph implements ResponseBody {
private final Node root;
@JsonCreator
- public JobVertexFlameGraph(
+ public VertexFlameGraph(
@JsonProperty(FIELD_NAME_END_TIMESTAMP) long endTimestamp,
@JsonProperty(FIELD_NAME_DATA) Node root) {
this.endTimestamp = endTimestamp;
@@ -69,18 +69,18 @@ public class JobVertexFlameGraph implements ResponseBody {
}
// Indicates that the task execution has been terminated
- public static JobVertexFlameGraph terminated() {
- return new JobVertexFlameGraph(-1, null);
+ public static VertexFlameGraph terminated() {
+ return new VertexFlameGraph(-1, null);
}
// Indicates that the flame graph feature has been disabled
- public static JobVertexFlameGraph disabled() {
- return new JobVertexFlameGraph(-2, null);
+ public static VertexFlameGraph disabled() {
+ return new VertexFlameGraph(-2, null);
}
// Indicates that it is waiting for the first samples to creating the flame graph
- public static JobVertexFlameGraph waiting() {
- return new JobVertexFlameGraph(-3, null);
+ public static VertexFlameGraph waiting() {
+ return new VertexFlameGraph(-3, null);
}
/** Graph node. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexFlameGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java
similarity index 80%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexFlameGraphFactory.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java
index 07f98fe33b0..35f06663274 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexFlameGraphFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java
@@ -30,21 +30,21 @@ import java.util.Map;
import java.util.Set;
/** Factory class for creating Flame Graph representations. */
-public class JobVertexFlameGraphFactory {
+public class VertexFlameGraphFactory {
/**
- * Converts {@link JobVertexThreadInfoStats} into a FlameGraph.
+ * Converts {@link VertexThreadInfoStats} into a FlameGraph.
*
* @param sample Thread details sample containing stack traces.
* @return FlameGraph data structure
*/
- public static JobVertexFlameGraph createFullFlameGraphFrom(JobVertexThreadInfoStats sample) {
+ public static VertexFlameGraph createFullFlameGraphFrom(VertexThreadInfoStats sample) {
EnumSet<Thread.State> included = EnumSet.allOf(Thread.State.class);
return createFlameGraphFromSample(sample, included);
}
/**
- * Converts {@link JobVertexThreadInfoStats} into a FlameGraph representing blocked (Off-CPU)
+ * Converts {@link VertexThreadInfoStats} into a FlameGraph representing blocked (Off-CPU)
* threads.
*
* <p>Includes threads in states Thread.State.[TIMED_WAITING, BLOCKED, WAITING].
@@ -52,14 +52,14 @@ public class JobVertexFlameGraphFactory {
* @param sample Thread details sample containing stack traces.
* @return FlameGraph data structure.
*/
- public static JobVertexFlameGraph createOffCpuFlameGraph(JobVertexThreadInfoStats sample) {
+ public static VertexFlameGraph createOffCpuFlameGraph(VertexThreadInfoStats sample) {
EnumSet<Thread.State> included =
EnumSet.of(Thread.State.TIMED_WAITING, Thread.State.BLOCKED, Thread.State.WAITING);
return createFlameGraphFromSample(sample, included);
}
/**
- * Converts {@link JobVertexThreadInfoStats} into a FlameGraph representing actively running
+ * Converts {@link VertexThreadInfoStats} into a FlameGraph representing actively running
* (On-CPU) threads.
*
* <p>Includes threads in states Thread.State.[RUNNABLE, NEW].
@@ -67,13 +67,13 @@ public class JobVertexFlameGraphFactory {
* @param sample Thread details sample containing stack traces.
* @return FlameGraph data structure
*/
- public static JobVertexFlameGraph createOnCpuFlameGraph(JobVertexThreadInfoStats sample) {
+ public static VertexFlameGraph createOnCpuFlameGraph(VertexThreadInfoStats sample) {
EnumSet<Thread.State> included = EnumSet.of(Thread.State.RUNNABLE, Thread.State.NEW);
return createFlameGraphFromSample(sample, included);
}
- private static JobVertexFlameGraph createFlameGraphFromSample(
- JobVertexThreadInfoStats sample, Set<Thread.State> threadStates) {
+ private static VertexFlameGraph createFlameGraphFromSample(
+ VertexThreadInfoStats sample, Set<Thread.State> threadStates) {
final NodeBuilder root = new NodeBuilder("root");
for (Collection<ThreadInfoSample> threadInfoSubSamples :
sample.getSamplesBySubtask().values()) {
@@ -94,7 +94,7 @@ public class JobVertexFlameGraphFactory {
}
}
}
- return new JobVertexFlameGraph(sample.getEndTime(), root.toNode());
+ return new VertexFlameGraph(sample.getEndTime(), root.toNode());
}
private static class NodeBuilder {
@@ -119,12 +119,12 @@ public class JobVertexFlameGraphFactory {
hitCount++;
}
- private JobVertexFlameGraph.Node toNode() {
- final List<JobVertexFlameGraph.Node> childrenNodes = new ArrayList<>(children.size());
+ private VertexFlameGraph.Node toNode() {
+ final List<VertexFlameGraph.Node> childrenNodes = new ArrayList<>(children.size());
for (NodeBuilder builderChild : children.values()) {
childrenNodes.add(builderChild.toNode());
}
- return new JobVertexFlameGraph.Node(
+ return new VertexFlameGraph.Node(
stackTraceLocation, hitCount, Collections.unmodifiableList(childrenNodes));
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexThreadInfoStats.java
similarity index 94%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoStats.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexThreadInfoStats.java
index f035bebc67b..d56945888c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexThreadInfoStats.java
@@ -28,11 +28,8 @@ import java.util.Map;
import static org.apache.flink.util.Preconditions.checkArgument;
-/**
- * Thread info statistics of multiple tasks. Each subtask can deliver multiple samples for
- * statistical purposes.
- */
-public class JobVertexThreadInfoStats implements Statistics {
+/** Thread info statistics of single JobVertex or ExecutionVertex. */
+public class VertexThreadInfoStats implements Statistics {
/** ID of the corresponding request. */
private final int requestId;
@@ -54,7 +51,7 @@ public class JobVertexThreadInfoStats implements Statistics {
* @param endTime Timestamp, when all thread info samples were collected.
* @param samplesBySubtask Map of thread info samples by subtask (execution ID).
*/
- public JobVertexThreadInfoStats(
+ public VertexThreadInfoStats(
int requestId,
long startTime,
long endTime,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandlerTest.java
new file mode 100644
index 00000000000..20e76022b3a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandlerTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionHistory;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.FlameGraphTypeQueryParameter;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexFlameGraphParameters;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexQueryParameter;
+import org.apache.flink.runtime.webmonitor.stats.JobVertexStatsTracker;
+import org.apache.flink.runtime.webmonitor.threadinfo.VertexFlameGraph;
+import org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoStats;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests of {@link JobVertexFlameGraphHandler}. */
+public class JobVertexFlameGraphHandlerTest extends TestLogger {
+
+ private static final JobID JOB_ID = new JobID();
+ private static final JobVertexID JOB_VERTEX_ID = new JobVertexID();
+
+ private static VertexThreadInfoStats taskThreadInfoStatsDefaultSample;
+ private static JobVertexFlameGraphHandler handler;
+
+ @BeforeAll
+ public static void setUp() {
+ taskThreadInfoStatsDefaultSample =
+ new VertexThreadInfoStats(
+ 8,
+ System.currentTimeMillis(),
+ System.currentTimeMillis() + 100,
+ Collections.emptyMap());
+
+ final RestHandlerConfiguration restHandlerConfiguration =
+ RestHandlerConfiguration.fromConfiguration(new Configuration());
+ handler =
+ new JobVertexFlameGraphHandler(
+ () -> null,
+ Time.milliseconds(100L),
+ Collections.emptyMap(),
+ new DefaultExecutionGraphCache(
+ restHandlerConfiguration.getTimeout(),
+ Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),
+ Executors.directExecutor(),
+ new TestThreadInfoTracker(taskThreadInfoStatsDefaultSample));
+ }
+
+ /** Some subtasks are finished, some subtasks are running. */
+ @Test
+ void testHandleMixedSubtasks() throws Exception {
+ final ArchivedExecutionJobVertex archivedExecutionJobVertex =
+ new ArchivedExecutionJobVertex(
+ new ArchivedExecutionVertex[] {
+ generateExecutionVertex(0, ExecutionState.FINISHED),
+ generateExecutionVertex(1, ExecutionState.RUNNING)
+ },
+ JOB_VERTEX_ID,
+ "test",
+ 2,
+ 2,
+ ResourceProfile.UNKNOWN,
+ new StringifiedAccumulatorResult[0]);
+
+ // Check the finished subtask
+ HandlerRequest<EmptyRequestBody> request = generateJobVertexFlameGraphParameters(0);
+ VertexFlameGraph jobVertexFlameGraph =
+ handler.handleRequest(request, archivedExecutionJobVertex);
+ assertThat(jobVertexFlameGraph.getEndTime())
+ .isEqualTo(VertexFlameGraph.terminated().getEndTime());
+
+ // Check the running subtask
+ request = generateJobVertexFlameGraphParameters(1);
+ jobVertexFlameGraph = handler.handleRequest(request, archivedExecutionJobVertex);
+ assertThat(jobVertexFlameGraph.getEndTime())
+ .isEqualTo(taskThreadInfoStatsDefaultSample.getEndTime());
+
+ // Check the job vertex
+ request = generateJobVertexFlameGraphParameters(null);
+ jobVertexFlameGraph = handler.handleRequest(request, archivedExecutionJobVertex);
+ assertThat(jobVertexFlameGraph.getEndTime())
+ .isEqualTo(taskThreadInfoStatsDefaultSample.getEndTime());
+ }
+
+ /** All subtasks are finished. */
+ @Test
+ void testHandleFinishedJobVertex() throws Exception {
+ final ArchivedExecutionJobVertex archivedExecutionJobVertex =
+ new ArchivedExecutionJobVertex(
+ new ArchivedExecutionVertex[] {
+ generateExecutionVertex(0, ExecutionState.FINISHED),
+ generateExecutionVertex(1, ExecutionState.FINISHED)
+ },
+ JOB_VERTEX_ID,
+ "test",
+ 2,
+ 2,
+ ResourceProfile.UNKNOWN,
+ new StringifiedAccumulatorResult[0]);
+
+ HandlerRequest<EmptyRequestBody> request = generateJobVertexFlameGraphParameters(null);
+ VertexFlameGraph jobVertexFlameGraph =
+ handler.handleRequest(request, archivedExecutionJobVertex);
+ assertThat(jobVertexFlameGraph.getEndTime())
+ .isEqualTo(VertexFlameGraph.terminated().getEndTime());
+ }
+
+ private HandlerRequest<EmptyRequestBody> generateJobVertexFlameGraphParameters(
+ Integer subtaskIndex) throws HandlerRequestException {
+ final HashMap<String, String> receivedPathParameters = new HashMap<>(2);
+ receivedPathParameters.put(JobIDPathParameter.KEY, JOB_ID.toString());
+ receivedPathParameters.put(JobVertexIdPathParameter.KEY, JOB_VERTEX_ID.toString());
+
+ Map<String, List<String>> queryParams = new HashMap<>(2);
+ queryParams.put(
+ FlameGraphTypeQueryParameter.KEY,
+ Collections.singletonList(FlameGraphTypeQueryParameter.Type.FULL.name()));
+ if (subtaskIndex != null) {
+ queryParams.put(
+ SubtaskIndexQueryParameter.KEY,
+ Collections.singletonList(subtaskIndex.toString()));
+ }
+
+ return HandlerRequest.resolveParametersAndCreate(
+ EmptyRequestBody.getInstance(),
+ new JobVertexFlameGraphParameters(),
+ receivedPathParameters,
+ queryParams,
+ Collections.emptyList());
+ }
+
+ private ArchivedExecutionVertex generateExecutionVertex(
+ int subtaskIndex, ExecutionState executionState) {
+ return new ArchivedExecutionVertex(
+ subtaskIndex,
+ "test task",
+ new ArchivedExecution(
+ new StringifiedAccumulatorResult[0],
+ null,
+ createExecutionAttemptId(JOB_VERTEX_ID, subtaskIndex, 0),
+ executionState,
+ null,
+ null,
+ null,
+ new long[ExecutionState.values().length],
+ new long[ExecutionState.values().length]),
+ new ExecutionHistory(0));
+ }
+
+ /**
+ * A {@link JobVertexStatsTracker} which returns the pre-generated thread info stats directly.
+ */
+ private static class TestThreadInfoTracker
+ implements JobVertexStatsTracker<VertexThreadInfoStats> {
+
+ private final VertexThreadInfoStats stats;
+
+ public TestThreadInfoTracker(VertexThreadInfoStats stats) {
+ this.stats = stats;
+ }
+
+ @Override
+ public Optional<VertexThreadInfoStats> getVertexStats(
+ JobID jobId, AccessExecutionJobVertex vertex) {
+ return Optional.of(stats);
+ }
+
+ @Override
+ public void shutDown() throws FlinkException {}
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
index d45aff28279..afd51257099 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
@@ -89,7 +89,7 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
.collect(Collectors.toSet());
private static final JobID JOB_ID = new JobID();
- private static JobVertexThreadInfoStats threadInfoStatsDefaultSample;
+ private static VertexThreadInfoStats threadInfoStatsDefaultSample;
private static final Duration CLEAN_UP_INTERVAL = Duration.ofSeconds(60);
private static final Duration STATS_REFRESH_INTERVAL = Duration.ofSeconds(60);
@@ -128,16 +128,16 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
/** Tests that cached result is reused within refresh interval. */
@Test
public void testCachedStatsNotUpdatedWithinRefreshInterval() throws Exception {
- final JobVertexThreadInfoStats unusedThreadInfoStats = createThreadInfoStats(1, TIME_GAP);
+ final VertexThreadInfoStats unusedThreadInfoStats = createThreadInfoStats(1, TIME_GAP);
- final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
+ final JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker =
createThreadInfoTracker(
STATS_REFRESH_INTERVAL,
threadInfoStatsDefaultSample,
unusedThreadInfoStats);
// stores threadInfoStatsDefaultSample in cache
doInitialRequestAndVerifyResult(tracker);
- Optional<JobVertexThreadInfoStats> result =
+ Optional<VertexThreadInfoStats> result =
tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
// cached result is returned instead of unusedThreadInfoStats
assertThat(result).isPresent().hasValue(threadInfoStatsDefaultSample);
@@ -149,19 +149,19 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
final Duration shortRefreshInterval = Duration.ofMillis(1);
// first entry is in the past, so refresh is triggered immediately upon fetching it
- final JobVertexThreadInfoStats initialThreadInfoStats =
+ final VertexThreadInfoStats initialThreadInfoStats =
createThreadInfoStats(
Instant.now().minus(10, ChronoUnit.SECONDS),
REQUEST_ID,
Duration.ofMillis(5));
- final JobVertexThreadInfoStats threadInfoStatsAfterRefresh =
+ final VertexThreadInfoStats threadInfoStatsAfterRefresh =
createThreadInfoStats(1, TIME_GAP);
// register a CountDownLatch with the cache so we can await refresh of the entry
CountDownLatch cacheRefreshed = new CountDownLatch(1);
- Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> vertexStatsCache =
+ Cache<JobVertexThreadInfoTracker.Key, VertexThreadInfoStats> vertexStatsCache =
createCache(CLEAN_UP_INTERVAL, new LatchRemovalListener<>(cacheRefreshed));
- final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
+ final JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker =
createThreadInfoTracker(
CLEAN_UP_INTERVAL,
shortRefreshInterval,
@@ -182,7 +182,7 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
cacheRefreshed.await();
// verify that we get the second result on the next request
- Optional<JobVertexThreadInfoStats> result =
+ Optional<VertexThreadInfoStats> result =
tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
assertExpectedEqualsReceived(threadInfoStatsAfterRefresh, result);
}
@@ -194,9 +194,9 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
// register a CountDownLatch with the cache so we can await expiry of the entry
CountDownLatch cacheExpired = new CountDownLatch(1);
- Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> vertexStatsCache =
+ Cache<JobVertexThreadInfoTracker.Key, VertexThreadInfoStats> vertexStatsCache =
createCache(shortCleanUpInterval, new LatchRemovalListener<>(cacheExpired));
- final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
+ final JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker =
createThreadInfoTracker(
shortCleanUpInterval,
STATS_REFRESH_INTERVAL,
@@ -214,8 +214,7 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
/** Tests that cached results are NOT removed within the cleanup interval. */
@Test
public void testCachedStatsNotCleanedWithinCleanupInterval() throws Exception {
- final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
- createThreadInfoTracker();
+ final JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker = createThreadInfoTracker();
doInitialRequestAndVerifyResult(tracker);
@@ -228,8 +227,7 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
/** Tests that cached results are not served after the shutdown. */
@Test
public void testShutDown() throws Exception {
- final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
- createThreadInfoTracker();
+ final JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker = createThreadInfoTracker();
doInitialRequestAndVerifyResult(tracker);
// shutdown directly
@@ -241,9 +239,9 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
assertThat(tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX)).isNotPresent();
}
- private Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> createCache(
+ private Cache<JobVertexThreadInfoTracker.Key, VertexThreadInfoStats> createCache(
Duration cleanUpInterval,
- RemovalListener<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats>
+ RemovalListener<JobVertexThreadInfoTracker.Key, VertexThreadInfoStats>
removalListener) {
return CacheBuilder.newBuilder()
.concurrencyLevel(1)
@@ -253,7 +251,7 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
}
private void doInitialRequestAndVerifyResult(
- JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker)
+ JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker)
throws InterruptedException, ExecutionException {
// no stats yet, but the request triggers async collection of stats
assertThat(tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX)).isNotPresent();
@@ -264,10 +262,9 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
}
private static void assertExpectedEqualsReceived(
- JobVertexThreadInfoStats expected,
- Optional<JobVertexThreadInfoStats> receivedOptional) {
+ VertexThreadInfoStats expected, Optional<VertexThreadInfoStats> receivedOptional) {
assertThat(receivedOptional).isPresent();
- JobVertexThreadInfoStats received = receivedOptional.get();
+ VertexThreadInfoStats received = receivedOptional.get();
assertThat(expected.getRequestId()).isEqualTo(received.getRequestId());
assertThat(expected.getEndTime()).isEqualTo(received.getEndTime());
@@ -279,20 +276,20 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
}
}
- private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> createThreadInfoTracker() {
+ private JobVertexThreadInfoTracker<VertexThreadInfoStats> createThreadInfoTracker() {
return createThreadInfoTracker(STATS_REFRESH_INTERVAL, threadInfoStatsDefaultSample);
}
- private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> createThreadInfoTracker(
- Duration statsRefreshInterval, JobVertexThreadInfoStats... stats) {
+ private JobVertexThreadInfoTracker<VertexThreadInfoStats> createThreadInfoTracker(
+ Duration statsRefreshInterval, VertexThreadInfoStats... stats) {
return createThreadInfoTracker(CLEAN_UP_INTERVAL, statsRefreshInterval, null, stats);
}
- private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> createThreadInfoTracker(
+ private JobVertexThreadInfoTracker<VertexThreadInfoStats> createThreadInfoTracker(
Duration cleanUpInterval,
Duration statsRefreshInterval,
- Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> vertexStatsCache,
- JobVertexThreadInfoStats... stats) {
+ Cache<JobVertexThreadInfoTracker.Key, VertexThreadInfoStats> vertexStatsCache,
+ VertexThreadInfoStats... stats) {
final ThreadInfoRequestCoordinator coordinator =
new TestingThreadInfoRequestCoordinator(Runnable::run, REQUEST_TIMEOUT, stats);
@@ -311,11 +308,11 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
.build();
}
- private static JobVertexThreadInfoStats createThreadInfoStats(int requestId, Duration timeGap) {
+ private static VertexThreadInfoStats createThreadInfoStats(int requestId, Duration timeGap) {
return createThreadInfoStats(Instant.now(), requestId, timeGap);
}
- private static JobVertexThreadInfoStats createThreadInfoStats(
+ private static VertexThreadInfoStats createThreadInfoStats(
Instant startTime, int requestId, Duration timeGap) {
Instant endTime = startTime.plus(timeGap);
@@ -331,7 +328,7 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
Collections.singletonList(threadInfoSample.get()));
}
- return new JobVertexThreadInfoStats(
+ return new VertexThreadInfoStats(
requestId, startTime.toEpochMilli(), endTime.toEpochMilli(), samples);
}
@@ -372,19 +369,19 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
*/
private static class TestingThreadInfoRequestCoordinator extends ThreadInfoRequestCoordinator {
- private final JobVertexThreadInfoStats[] jobVertexThreadInfoStats;
+ private final VertexThreadInfoStats[] vertexThreadInfoStats;
private int counter = 0;
TestingThreadInfoRequestCoordinator(
Executor executor,
Duration requestTimeout,
- JobVertexThreadInfoStats... jobVertexThreadInfoStats) {
+ VertexThreadInfoStats... vertexThreadInfoStats) {
super(executor, requestTimeout);
- this.jobVertexThreadInfoStats = jobVertexThreadInfoStats;
+ this.vertexThreadInfoStats = vertexThreadInfoStats;
}
@Override
- public CompletableFuture<JobVertexThreadInfoStats> triggerThreadInfoRequest(
+ public CompletableFuture<VertexThreadInfoStats> triggerThreadInfoRequest(
Map<
ImmutableSet<ExecutionAttemptID>,
CompletableFuture<TaskExecutorThreadInfoGateway>>
@@ -396,7 +393,7 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
assertThat(executionsWithGateways.keySet().iterator().next()).isEqualTo(ATTEMPT_IDS);
return CompletableFuture.completedFuture(
- jobVertexThreadInfoStats[(counter++) % jobVertexThreadInfoStats.length]);
+ vertexThreadInfoStats[(counter++) % vertexThreadInfoStats.length]);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinatorTest.java
index 147195ea859..f937ccd1c5d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinatorTest.java
@@ -104,14 +104,14 @@ public class ThreadInfoRequestCoordinatorTest extends TestLogger {
createMockSubtaskWithGateways(
CompletionType.SUCCESSFULLY, CompletionType.SUCCESSFULLY);
- CompletableFuture<JobVertexThreadInfoStats> requestFuture =
+ CompletableFuture<VertexThreadInfoStats> requestFuture =
coordinator.triggerThreadInfoRequest(
executionWithGateways,
DEFAULT_NUMBER_OF_SAMPLES,
DEFAULT_DELAY_BETWEEN_SAMPLES,
DEFAULT_MAX_STACK_TRACE_DEPTH);
- JobVertexThreadInfoStats threadInfoStats = requestFuture.get();
+ VertexThreadInfoStats threadInfoStats = requestFuture.get();
// verify the request result
assertThat(threadInfoStats.getRequestId()).isEqualTo(0);
@@ -133,7 +133,7 @@ public class ThreadInfoRequestCoordinatorTest extends TestLogger {
createMockSubtaskWithGateways(
CompletionType.SUCCESSFULLY, CompletionType.EXCEPTIONALLY);
- CompletableFuture<JobVertexThreadInfoStats> requestFuture =
+ CompletableFuture<VertexThreadInfoStats> requestFuture =
coordinator.triggerThreadInfoRequest(
executionWithGateways,
DEFAULT_NUMBER_OF_SAMPLES,
@@ -156,7 +156,7 @@ public class ThreadInfoRequestCoordinatorTest extends TestLogger {
createMockSubtaskWithGateways(
CompletionType.SUCCESSFULLY, CompletionType.TIMEOUT);
- CompletableFuture<JobVertexThreadInfoStats> requestFuture =
+ CompletableFuture<VertexThreadInfoStats> requestFuture =
coordinator.triggerThreadInfoRequest(
executionWithGateways,
DEFAULT_NUMBER_OF_SAMPLES,
@@ -184,16 +184,16 @@ public class ThreadInfoRequestCoordinatorTest extends TestLogger {
createMockSubtaskWithGateways(
CompletionType.SUCCESSFULLY, CompletionType.TIMEOUT);
- List<CompletableFuture<JobVertexThreadInfoStats>> requestFutures = new ArrayList<>();
+ List<CompletableFuture<VertexThreadInfoStats>> requestFutures = new ArrayList<>();
- CompletableFuture<JobVertexThreadInfoStats> requestFuture1 =
+ CompletableFuture<VertexThreadInfoStats> requestFuture1 =
coordinator.triggerThreadInfoRequest(
executionWithGateways,
DEFAULT_NUMBER_OF_SAMPLES,
DEFAULT_DELAY_BETWEEN_SAMPLES,
DEFAULT_MAX_STACK_TRACE_DEPTH);
- CompletableFuture<JobVertexThreadInfoStats> requestFuture2 =
+ CompletableFuture<VertexThreadInfoStats> requestFuture2 =
coordinator.triggerThreadInfoRequest(
executionWithGateways,
DEFAULT_NUMBER_OF_SAMPLES,
@@ -204,7 +204,7 @@ public class ThreadInfoRequestCoordinatorTest extends TestLogger {
requestFutures.add(requestFuture1);
requestFutures.add(requestFuture2);
- for (CompletableFuture<JobVertexThreadInfoStats> future : requestFutures) {
+ for (CompletableFuture<VertexThreadInfoStats> future : requestFutures) {
assertThat(future).isNotDone();
}
@@ -212,12 +212,12 @@ public class ThreadInfoRequestCoordinatorTest extends TestLogger {
coordinator.shutDown();
// verify all completed
- for (CompletableFuture<JobVertexThreadInfoStats> future : requestFutures) {
+ for (CompletableFuture<VertexThreadInfoStats> future : requestFutures) {
assertThat(future).isCompletedExceptionally();
}
// verify new trigger returns failed future
- CompletableFuture<JobVertexThreadInfoStats> future =
+ CompletableFuture<VertexThreadInfoStats> future =
coordinator.triggerThreadInfoRequest(
executionWithGateways,
DEFAULT_NUMBER_OF_SAMPLES,