You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2023/01/05 11:24:37 UTC

[flink] 02/03: [FLINK-30185][rest] Add the subtask index parameter for flame graph handler

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e07aab138578d1f65f2d7260bd33391b15b2012a
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sat Dec 17 22:26:33 2022 +0800

    [FLINK-30185][rest] Add the subtask index parameter for flame graph handler
---
 .../src/test/resources/rest_api_v1.snapshot        |   3 +
 .../handler/job/JobVertexFlameGraphHandler.java    |  92 +++++++--
 .../messages/FlameGraphTypeQueryParameter.java     |   4 +-
 .../rest/messages/JobVertexFlameGraphHeaders.java  |   8 +-
 .../messages/JobVertexFlameGraphParameters.java    |   7 +-
 ...meters.java => SubtaskIndexQueryParameter.java} |  31 ++-
 .../runtime/webmonitor/WebMonitorEndpoint.java     |   4 +-
 .../threadinfo/JobVertexThreadInfoTracker.java     |  10 +-
 .../JobVertexThreadInfoTrackerBuilder.java         |   6 +-
 .../threadinfo/ThreadInfoRequestCoordinator.java   |  11 +-
 ...VertexFlameGraph.java => VertexFlameGraph.java} |  18 +-
 ...phFactory.java => VertexFlameGraphFactory.java} |  26 +--
 ...adInfoStats.java => VertexThreadInfoStats.java} |   9 +-
 .../job/JobVertexFlameGraphHandlerTest.java        | 216 +++++++++++++++++++++
 .../threadinfo/JobVertexThreadInfoTrackerTest.java |  67 +++----
 .../ThreadInfoRequestCoordinatorTest.java          |  20 +-
 16 files changed, 406 insertions(+), 126 deletions(-)

diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 9f1a3bec092..de462b9f204 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -2745,6 +2745,9 @@
       "queryParameters" : [ {
         "key" : "type",
         "mandatory" : false
+      }, {
+        "key" : "subtaskindex",
+        "mandatory" : false
       } ]
     },
     "request" : {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandler.java
index eb23f0f1436..516ab8bce91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -29,28 +30,32 @@ import org.apache.flink.runtime.rest.messages.FlameGraphTypeQueryParameter;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexFlameGraphHeaders;
 import org.apache.flink.runtime.rest.messages.JobVertexFlameGraphParameters;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexQueryParameter;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexFlameGraph;
-import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexFlameGraphFactory;
-import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoStats;
-import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTracker;
+import org.apache.flink.runtime.webmonitor.stats.JobVertexStatsTracker;
+import org.apache.flink.runtime.webmonitor.threadinfo.VertexFlameGraph;
+import org.apache.flink.runtime.webmonitor.threadinfo.VertexFlameGraphFactory;
+import org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoStats;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /** Request handler for the job vertex Flame Graph. */
 public class JobVertexFlameGraphHandler
-        extends AbstractJobVertexHandler<JobVertexFlameGraph, JobVertexFlameGraphParameters> {
+        extends AbstractJobVertexHandler<VertexFlameGraph, JobVertexFlameGraphParameters> {
 
-    private final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> threadInfoOperatorTracker;
+    private final JobVertexStatsTracker<VertexThreadInfoStats> threadInfoOperatorTracker;
 
     public JobVertexFlameGraphHandler(
             GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -58,7 +63,7 @@ public class JobVertexFlameGraphHandler
             Map<String, String> responseHeaders,
             ExecutionGraphCache executionGraphCache,
             Executor executor,
-            JobVertexThreadInfoTracker<JobVertexThreadInfoStats> threadInfoOperatorTracker) {
+            JobVertexStatsTracker<VertexThreadInfoStats> threadInfoOperatorTracker) {
         super(
                 leaderRetriever,
                 timeout,
@@ -70,34 +75,40 @@ public class JobVertexFlameGraphHandler
     }
 
     @Override
-    protected JobVertexFlameGraph handleRequest(
+    protected VertexFlameGraph handleRequest(
             HandlerRequest<EmptyRequestBody> request, AccessExecutionJobVertex jobVertex)
             throws RestHandlerException {
 
-        if (jobVertex.getAggregateState().isTerminal()) {
-            return JobVertexFlameGraph.terminated();
+        @Nullable Integer subtaskIndex = getSubtaskIndex(request, jobVertex);
+        if (isTerminated(jobVertex, subtaskIndex)) {
+            return VertexFlameGraph.terminated();
         }
 
-        final Optional<JobVertexThreadInfoStats> threadInfoSample =
+        Optional<VertexThreadInfoStats> threadInfoSample =
                 threadInfoOperatorTracker.getVertexStats(
                         request.getPathParameter(JobIDPathParameter.class), jobVertex);
 
+        if (subtaskIndex != null) {
+            threadInfoSample =
+                    threadInfoSample.map(generateThreadInfoStatsForSubtask(subtaskIndex));
+        }
+
         final FlameGraphTypeQueryParameter.Type flameGraphType = getFlameGraphType(request);
 
-        final Optional<JobVertexFlameGraph> operatorFlameGraph;
+        final Optional<VertexFlameGraph> operatorFlameGraph;
 
         switch (flameGraphType) {
             case FULL:
                 operatorFlameGraph =
-                        threadInfoSample.map(JobVertexFlameGraphFactory::createFullFlameGraphFrom);
+                        threadInfoSample.map(VertexFlameGraphFactory::createFullFlameGraphFrom);
                 break;
             case ON_CPU:
                 operatorFlameGraph =
-                        threadInfoSample.map(JobVertexFlameGraphFactory::createOnCpuFlameGraph);
+                        threadInfoSample.map(VertexFlameGraphFactory::createOnCpuFlameGraph);
                 break;
             case OFF_CPU:
                 operatorFlameGraph =
-                        threadInfoSample.map(JobVertexFlameGraphFactory::createOffCpuFlameGraph);
+                        threadInfoSample.map(VertexFlameGraphFactory::createOffCpuFlameGraph);
                 break;
             default:
                 throw new RestHandlerException(
@@ -105,7 +116,28 @@ public class JobVertexFlameGraphHandler
                         HttpResponseStatus.BAD_REQUEST);
         }
 
-        return operatorFlameGraph.orElse(JobVertexFlameGraph.waiting());
+        return operatorFlameGraph.orElse(VertexFlameGraph.waiting());
+    }
+
+    private Function<VertexThreadInfoStats, VertexThreadInfoStats>
+            generateThreadInfoStatsForSubtask(Integer subtaskIndex) {
+        return stats ->
+                new VertexThreadInfoStats(
+                        stats.getRequestId(),
+                        stats.getStartTime(),
+                        stats.getEndTime(),
+                        stats.getSamplesBySubtask().entrySet().stream()
+                                .filter(entry -> entry.getKey().getSubtaskIndex() == subtaskIndex)
+                                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
+    }
+
+    private boolean isTerminated(
+            AccessExecutionJobVertex jobVertex, @Nullable Integer subtaskIndex) {
+        if (subtaskIndex == null) {
+            return jobVertex.getAggregateState().isTerminal();
+        }
+        AccessExecutionVertex executionVertex = jobVertex.getTaskVertices()[subtaskIndex];
+        return executionVertex.getExecutionState().isTerminal();
     }
 
     private static FlameGraphTypeQueryParameter.Type getFlameGraphType(HandlerRequest<?> request) {
@@ -114,9 +146,27 @@ public class JobVertexFlameGraphHandler
 
         if (flameGraphTypeParameter.isEmpty()) {
             return FlameGraphTypeQueryParameter.Type.FULL;
-        } else {
-            return flameGraphTypeParameter.get(0);
         }
+        return flameGraphTypeParameter.get(0);
+    }
+
+    @Nullable
+    private static Integer getSubtaskIndex(
+            HandlerRequest<?> request, AccessExecutionJobVertex jobVertex)
+            throws RestHandlerException {
+        final List<Integer> subtaskIndexParameter =
+                request.getQueryParameter(SubtaskIndexQueryParameter.class);
+
+        if (subtaskIndexParameter.isEmpty()) {
+            return null;
+        }
+        int subtaskIndex = subtaskIndexParameter.get(0);
+        if (subtaskIndex >= jobVertex.getTaskVertices().length || subtaskIndex < 0) {
+            throw new RestHandlerException(
+                    "Invalid subtask index for vertex " + jobVertex.getJobVertexId(),
+                    HttpResponseStatus.NOT_FOUND);
+        }
+        return subtaskIndex;
     }
 
     @Override
@@ -135,7 +185,7 @@ public class JobVertexFlameGraphHandler
             extends AbstractRestHandler<
                     RestfulGateway,
                     EmptyRequestBody,
-                    JobVertexFlameGraph,
+                    VertexFlameGraph,
                     JobVertexFlameGraphParameters> {
         protected DisabledJobVertexFlameGraphHandler(
                 GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -149,10 +199,10 @@ public class JobVertexFlameGraphHandler
         }
 
         @Override
-        protected CompletableFuture<JobVertexFlameGraph> handleRequest(
+        protected CompletableFuture<VertexFlameGraph> handleRequest(
                 @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway)
                 throws RestHandlerException {
-            return CompletableFuture.completedFuture(JobVertexFlameGraph.disabled());
+            return CompletableFuture.completedFuture(VertexFlameGraph.disabled());
         }
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/FlameGraphTypeQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/FlameGraphTypeQueryParameter.java
index ae918fe0fd4..a0625394469 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/FlameGraphTypeQueryParameter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/FlameGraphTypeQueryParameter.java
@@ -24,10 +24,10 @@ import java.util.Arrays;
 public class FlameGraphTypeQueryParameter
         extends MessageQueryParameter<FlameGraphTypeQueryParameter.Type> {
 
-    private static final String key = "type";
+    public static final String KEY = "type";
 
     public FlameGraphTypeQueryParameter() {
-        super(key, MessageParameterRequisiteness.OPTIONAL);
+        super(KEY, MessageParameterRequisiteness.OPTIONAL);
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphHeaders.java
index 401c2202a03..e605d2e5f40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphHeaders.java
@@ -20,14 +20,14 @@ package org.apache.flink.runtime.rest.messages;
 
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.handler.job.JobVertexFlameGraphHandler;
-import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexFlameGraph;
+import org.apache.flink.runtime.webmonitor.threadinfo.VertexFlameGraph;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Message headers for the {@link JobVertexFlameGraphHandler}. */
 public class JobVertexFlameGraphHeaders
         implements RuntimeMessageHeaders<
-                EmptyRequestBody, JobVertexFlameGraph, JobVertexFlameGraphParameters> {
+                EmptyRequestBody, VertexFlameGraph, JobVertexFlameGraphParameters> {
 
     private static final JobVertexFlameGraphHeaders INSTANCE = new JobVertexFlameGraphHeaders();
 
@@ -44,8 +44,8 @@ public class JobVertexFlameGraphHeaders
     }
 
     @Override
-    public Class<JobVertexFlameGraph> getResponseClass() {
-        return JobVertexFlameGraph.class;
+    public Class<VertexFlameGraph> getResponseClass() {
+        return VertexFlameGraph.class;
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphParameters.java
index 8b5bb945dd0..4fb34f7c3f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphParameters.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.rest.messages;
 
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 
 /** Message parameters for job vertex Flame Graph REST handler. */
 public class JobVertexFlameGraphParameters extends JobVertexMessageParameters {
@@ -27,8 +27,11 @@ public class JobVertexFlameGraphParameters extends JobVertexMessageParameters {
     public final FlameGraphTypeQueryParameter flameGraphTypeQueryParameter =
             new FlameGraphTypeQueryParameter();
 
+    public final SubtaskIndexQueryParameter subtaskIndexQueryParameter =
+            new SubtaskIndexQueryParameter();
+
     @Override
     public Collection<MessageQueryParameter<?>> getQueryParameters() {
-        return Collections.singleton(flameGraphTypeQueryParameter);
+        return Arrays.asList(flameGraphTypeQueryParameter, subtaskIndexQueryParameter);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexQueryParameter.java
similarity index 50%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphParameters.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexQueryParameter.java
index 8b5bb945dd0..08c1d0e3ff7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexQueryParameter.java
@@ -18,17 +18,32 @@
 
 package org.apache.flink.runtime.rest.messages;
 
-import java.util.Collection;
-import java.util.Collections;
+/** Query parameter specifying the index of a subtask. */
+public class SubtaskIndexQueryParameter extends MessageQueryParameter<Integer> {
 
-/** Message parameters for job vertex Flame Graph REST handler. */
-public class JobVertexFlameGraphParameters extends JobVertexMessageParameters {
+    public static final String KEY = "subtaskindex";
 
-    public final FlameGraphTypeQueryParameter flameGraphTypeQueryParameter =
-            new FlameGraphTypeQueryParameter();
+    public SubtaskIndexQueryParameter() {
+        super(KEY, MessageParameterRequisiteness.OPTIONAL);
+    }
+
+    @Override
+    public Integer convertStringToValue(String value) throws ConversionException {
+        final int subtaskIndex = Integer.parseInt(value);
+        if (subtaskIndex >= 0) {
+            return subtaskIndex;
+        } else {
+            throw new ConversionException("subtaskindex must be positive, was: " + subtaskIndex);
+        }
+    }
+
+    @Override
+    public String convertValueToString(Integer value) {
+        return value.toString();
+    }
 
     @Override
-    public Collection<MessageQueryParameter<?>> getQueryParameters() {
-        return Collections.singleton(flameGraphTypeQueryParameter);
+    public String getDescription() {
+        return "Positive integer value that identifies a subtask.";
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 259495a3f33..1ab048995aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -148,10 +148,10 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoStats;
 import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTracker;
 import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTrackerBuilder;
 import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoRequestCoordinator;
+import org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoStats;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
@@ -241,7 +241,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
         this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
     }
 
-    private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> initializeThreadInfoTracker(
+    private JobVertexThreadInfoTracker<VertexThreadInfoStats> initializeThreadInfoTracker(
             ScheduledExecutorService executor) {
         final Duration akkaTimeout = clusterConfiguration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
index 6933655c0b4..95350f72ae9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
@@ -77,7 +77,7 @@ public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVert
     @GuardedBy("lock")
     private final ThreadInfoRequestCoordinator coordinator;
 
-    private final Function<JobVertexThreadInfoStats, T> createStatsFn;
+    private final Function<VertexThreadInfoStats, T> createStatsFn;
 
     private final ExecutorService executor;
 
@@ -108,7 +108,7 @@ public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVert
     JobVertexThreadInfoTracker(
             ThreadInfoRequestCoordinator coordinator,
             GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
-            Function<JobVertexThreadInfoStats, T> createStatsFn,
+            Function<VertexThreadInfoStats, T> createStatsFn,
             ScheduledExecutorService executor,
             Duration cleanUpInterval,
             int numSamples,
@@ -193,7 +193,7 @@ public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVert
             final CompletableFuture<ResourceManagerGateway> gatewayFuture =
                     resourceManagerGatewayRetriever.getFuture();
 
-            CompletableFuture<JobVertexThreadInfoStats> sample =
+            CompletableFuture<VertexThreadInfoStats> sample =
                     gatewayFuture.thenCompose(
                             (ResourceManagerGateway resourceManagerGateway) ->
                                     coordinator.triggerThreadInfoRequest(
@@ -334,7 +334,7 @@ public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVert
 
     /** Callback on completed thread info sample. */
     private class ThreadInfoSampleCompletionCallback
-            implements BiConsumer<JobVertexThreadInfoStats, Throwable> {
+            implements BiConsumer<VertexThreadInfoStats, Throwable> {
 
         private final Key key;
         private final AccessExecutionJobVertex vertex;
@@ -345,7 +345,7 @@ public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVert
         }
 
         @Override
-        public void accept(JobVertexThreadInfoStats threadInfoStats, Throwable throwable) {
+        public void accept(VertexThreadInfoStats threadInfoStats, Throwable throwable) {
             synchronized (lock) {
                 try {
                     if (shutDown) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerBuilder.java
index 86e15334a05..530654fbcda 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerBuilder.java
@@ -42,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 public class JobVertexThreadInfoTrackerBuilder<T extends Statistics> {
 
     private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
-    private final Function<JobVertexThreadInfoStats, T> createStatsFn;
+    private final Function<VertexThreadInfoStats, T> createStatsFn;
     private final ScheduledExecutorService executor;
     private final Time restTimeout;
 
@@ -56,7 +56,7 @@ public class JobVertexThreadInfoTrackerBuilder<T extends Statistics> {
 
     JobVertexThreadInfoTrackerBuilder(
             GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
-            Function<JobVertexThreadInfoStats, T> createStatsFn,
+            Function<VertexThreadInfoStats, T> createStatsFn,
             ScheduledExecutorService executor,
             Time restTimeout) {
         this.resourceManagerGatewayRetriever = resourceManagerGatewayRetriever;
@@ -188,7 +188,7 @@ public class JobVertexThreadInfoTrackerBuilder<T extends Statistics> {
      */
     public static <T extends Statistics> JobVertexThreadInfoTrackerBuilder<T> newBuilder(
             GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
-            Function<JobVertexThreadInfoStats, T> createStatsFn,
+            Function<VertexThreadInfoStats, T> createStatsFn,
             ScheduledExecutorService executor,
             Time restTimeout) {
         return new JobVertexThreadInfoTrackerBuilder<>(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java
index eb0929124d5..8f080f41f8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinator.java
@@ -42,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /** A coordinator for triggering and collecting thread info stats of running job vertex subtasks. */
 public class ThreadInfoRequestCoordinator
         extends TaskStatsRequestCoordinator<
-                Map<ExecutionAttemptID, Collection<ThreadInfoSample>>, JobVertexThreadInfoStats> {
+                Map<ExecutionAttemptID, Collection<ThreadInfoSample>>, VertexThreadInfoStats> {
 
     /**
      * Creates a new coordinator for the job.
@@ -68,7 +68,7 @@ public class ThreadInfoRequestCoordinator
      *     samples.
      * @return A future of the completed thread info stats.
      */
-    public CompletableFuture<JobVertexThreadInfoStats> triggerThreadInfoRequest(
+    public CompletableFuture<VertexThreadInfoStats> triggerThreadInfoRequest(
             Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>>
                     executionsWithGateways,
             int numSamples,
@@ -161,8 +161,7 @@ public class ThreadInfoRequestCoordinator
 
     private static class PendingThreadInfoRequest
             extends PendingStatsRequest<
-                    Map<ExecutionAttemptID, Collection<ThreadInfoSample>>,
-                    JobVertexThreadInfoStats> {
+                    Map<ExecutionAttemptID, Collection<ThreadInfoSample>>, VertexThreadInfoStats> {
 
         PendingThreadInfoRequest(
                 int requestId, Collection<? extends Set<ExecutionAttemptID>> tasksToCollect) {
@@ -170,13 +169,13 @@ public class ThreadInfoRequestCoordinator
         }
 
         @Override
-        protected JobVertexThreadInfoStats assembleCompleteStats(long endTime) {
+        protected VertexThreadInfoStats assembleCompleteStats(long endTime) {
             HashMap<ExecutionAttemptID, Collection<ThreadInfoSample>> samples = new HashMap<>();
             for (Map<ExecutionAttemptID, Collection<ThreadInfoSample>> map :
                     statsResultByTaskGroup.values()) {
                 samples.putAll(map);
             }
-            return new JobVertexThreadInfoStats(requestId, startTime, endTime, samples);
+            return new VertexThreadInfoStats(requestId, startTime, endTime, samples);
         }
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexFlameGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraph.java
similarity index 90%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexFlameGraph.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraph.java
index 96fe0a21492..2bd160e186c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexFlameGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraph.java
@@ -28,12 +28,12 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import java.util.List;
 
 /**
- * Flame Graph representation for a job vertex.
+ * Flame Graph representation for a job vertex or an execution vertex.
  *
  * <p>Statistics are gathered by sampling stack traces of running tasks.
  */
 @JsonInclude(JsonInclude.Include.NON_NULL)
-public class JobVertexFlameGraph implements ResponseBody {
+public class VertexFlameGraph implements ResponseBody {
 
     private static final String FIELD_NAME_END_TIMESTAMP = "endTimestamp";
     private static final String FIELD_NAME_DATA = "data";
@@ -46,7 +46,7 @@ public class JobVertexFlameGraph implements ResponseBody {
     private final Node root;
 
     @JsonCreator
-    public JobVertexFlameGraph(
+    public VertexFlameGraph(
             @JsonProperty(FIELD_NAME_END_TIMESTAMP) long endTimestamp,
             @JsonProperty(FIELD_NAME_DATA) Node root) {
         this.endTimestamp = endTimestamp;
@@ -69,18 +69,18 @@ public class JobVertexFlameGraph implements ResponseBody {
     }
 
     // Indicates that the task execution has been terminated
-    public static JobVertexFlameGraph terminated() {
-        return new JobVertexFlameGraph(-1, null);
+    public static VertexFlameGraph terminated() {
+        return new VertexFlameGraph(-1, null);
     }
 
     // Indicates that the flame graph feature has been disabled
-    public static JobVertexFlameGraph disabled() {
-        return new JobVertexFlameGraph(-2, null);
+    public static VertexFlameGraph disabled() {
+        return new VertexFlameGraph(-2, null);
     }
 
     // Indicates that it is waiting for the first samples to creating the flame graph
-    public static JobVertexFlameGraph waiting() {
-        return new JobVertexFlameGraph(-3, null);
+    public static VertexFlameGraph waiting() {
+        return new VertexFlameGraph(-3, null);
     }
 
     /** Graph node. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexFlameGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java
similarity index 80%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexFlameGraphFactory.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java
index 07f98fe33b0..35f06663274 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexFlameGraphFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexFlameGraphFactory.java
@@ -30,21 +30,21 @@ import java.util.Map;
 import java.util.Set;
 
 /** Factory class for creating Flame Graph representations. */
-public class JobVertexFlameGraphFactory {
+public class VertexFlameGraphFactory {
 
     /**
-     * Converts {@link JobVertexThreadInfoStats} into a FlameGraph.
+     * Converts {@link VertexThreadInfoStats} into a FlameGraph.
      *
      * @param sample Thread details sample containing stack traces.
      * @return FlameGraph data structure
      */
-    public static JobVertexFlameGraph createFullFlameGraphFrom(JobVertexThreadInfoStats sample) {
+    public static VertexFlameGraph createFullFlameGraphFrom(VertexThreadInfoStats sample) {
         EnumSet<Thread.State> included = EnumSet.allOf(Thread.State.class);
         return createFlameGraphFromSample(sample, included);
     }
 
     /**
-     * Converts {@link JobVertexThreadInfoStats} into a FlameGraph representing blocked (Off-CPU)
+     * Converts {@link VertexThreadInfoStats} into a FlameGraph representing blocked (Off-CPU)
      * threads.
      *
      * <p>Includes threads in states Thread.State.[TIMED_WAITING, BLOCKED, WAITING].
@@ -52,14 +52,14 @@ public class JobVertexFlameGraphFactory {
      * @param sample Thread details sample containing stack traces.
      * @return FlameGraph data structure.
      */
-    public static JobVertexFlameGraph createOffCpuFlameGraph(JobVertexThreadInfoStats sample) {
+    public static VertexFlameGraph createOffCpuFlameGraph(VertexThreadInfoStats sample) {
         EnumSet<Thread.State> included =
                 EnumSet.of(Thread.State.TIMED_WAITING, Thread.State.BLOCKED, Thread.State.WAITING);
         return createFlameGraphFromSample(sample, included);
     }
 
     /**
-     * Converts {@link JobVertexThreadInfoStats} into a FlameGraph representing actively running
+     * Converts {@link VertexThreadInfoStats} into a FlameGraph representing actively running
      * (On-CPU) threads.
      *
      * <p>Includes threads in states Thread.State.[RUNNABLE, NEW].
@@ -67,13 +67,13 @@ public class JobVertexFlameGraphFactory {
      * @param sample Thread details sample containing stack traces.
      * @return FlameGraph data structure
      */
-    public static JobVertexFlameGraph createOnCpuFlameGraph(JobVertexThreadInfoStats sample) {
+    public static VertexFlameGraph createOnCpuFlameGraph(VertexThreadInfoStats sample) {
         EnumSet<Thread.State> included = EnumSet.of(Thread.State.RUNNABLE, Thread.State.NEW);
         return createFlameGraphFromSample(sample, included);
     }
 
-    private static JobVertexFlameGraph createFlameGraphFromSample(
-            JobVertexThreadInfoStats sample, Set<Thread.State> threadStates) {
+    private static VertexFlameGraph createFlameGraphFromSample(
+            VertexThreadInfoStats sample, Set<Thread.State> threadStates) {
         final NodeBuilder root = new NodeBuilder("root");
         for (Collection<ThreadInfoSample> threadInfoSubSamples :
                 sample.getSamplesBySubtask().values()) {
@@ -94,7 +94,7 @@ public class JobVertexFlameGraphFactory {
                 }
             }
         }
-        return new JobVertexFlameGraph(sample.getEndTime(), root.toNode());
+        return new VertexFlameGraph(sample.getEndTime(), root.toNode());
     }
 
     private static class NodeBuilder {
@@ -119,12 +119,12 @@ public class JobVertexFlameGraphFactory {
             hitCount++;
         }
 
-        private JobVertexFlameGraph.Node toNode() {
-            final List<JobVertexFlameGraph.Node> childrenNodes = new ArrayList<>(children.size());
+        private VertexFlameGraph.Node toNode() {
+            final List<VertexFlameGraph.Node> childrenNodes = new ArrayList<>(children.size());
             for (NodeBuilder builderChild : children.values()) {
                 childrenNodes.add(builderChild.toNode());
             }
-            return new JobVertexFlameGraph.Node(
+            return new VertexFlameGraph.Node(
                     stackTraceLocation, hitCount, Collections.unmodifiableList(childrenNodes));
         }
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexThreadInfoStats.java
similarity index 94%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoStats.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexThreadInfoStats.java
index f035bebc67b..d56945888c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/VertexThreadInfoStats.java
@@ -28,11 +28,8 @@ import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
-/**
- * Thread info statistics of multiple tasks. Each subtask can deliver multiple samples for
- * statistical purposes.
- */
-public class JobVertexThreadInfoStats implements Statistics {
+/** Thread info statistics of single JobVertex or ExecutionVertex. */
+public class VertexThreadInfoStats implements Statistics {
 
     /** ID of the corresponding request. */
     private final int requestId;
@@ -54,7 +51,7 @@ public class JobVertexThreadInfoStats implements Statistics {
      * @param endTime Timestamp, when all thread info samples were collected.
      * @param samplesBySubtask Map of thread info samples by subtask (execution ID).
      */
-    public JobVertexThreadInfoStats(
+    public VertexThreadInfoStats(
             int requestId,
             long startTime,
             long endTime,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandlerTest.java
new file mode 100644
index 00000000000..20e76022b3a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexFlameGraphHandlerTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionHistory;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.FlameGraphTypeQueryParameter;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexFlameGraphParameters;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexQueryParameter;
+import org.apache.flink.runtime.webmonitor.stats.JobVertexStatsTracker;
+import org.apache.flink.runtime.webmonitor.threadinfo.VertexFlameGraph;
+import org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoStats;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests of {@link JobVertexFlameGraphHandler}. */
+public class JobVertexFlameGraphHandlerTest extends TestLogger {
+
+    private static final JobID JOB_ID = new JobID();
+    private static final JobVertexID JOB_VERTEX_ID = new JobVertexID();
+
+    private static VertexThreadInfoStats taskThreadInfoStatsDefaultSample;
+    private static JobVertexFlameGraphHandler handler;
+
+    @BeforeAll
+    public static void setUp() {
+        taskThreadInfoStatsDefaultSample =
+                new VertexThreadInfoStats(
+                        8,
+                        System.currentTimeMillis(),
+                        System.currentTimeMillis() + 100,
+                        Collections.emptyMap());
+
+        final RestHandlerConfiguration restHandlerConfiguration =
+                RestHandlerConfiguration.fromConfiguration(new Configuration());
+        handler =
+                new JobVertexFlameGraphHandler(
+                        () -> null,
+                        Time.milliseconds(100L),
+                        Collections.emptyMap(),
+                        new DefaultExecutionGraphCache(
+                                restHandlerConfiguration.getTimeout(),
+                                Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),
+                        Executors.directExecutor(),
+                        new TestThreadInfoTracker(taskThreadInfoStatsDefaultSample));
+    }
+
+    /** Some subtasks are finished, some subtasks are running. */
+    @Test
+    void testHandleMixedSubtasks() throws Exception {
+        final ArchivedExecutionJobVertex archivedExecutionJobVertex =
+                new ArchivedExecutionJobVertex(
+                        new ArchivedExecutionVertex[] {
+                            generateExecutionVertex(0, ExecutionState.FINISHED),
+                            generateExecutionVertex(1, ExecutionState.RUNNING)
+                        },
+                        JOB_VERTEX_ID,
+                        "test",
+                        2,
+                        2,
+                        ResourceProfile.UNKNOWN,
+                        new StringifiedAccumulatorResult[0]);
+
+        // Check the finished subtask
+        HandlerRequest<EmptyRequestBody> request = generateJobVertexFlameGraphParameters(0);
+        VertexFlameGraph jobVertexFlameGraph =
+                handler.handleRequest(request, archivedExecutionJobVertex);
+        assertThat(jobVertexFlameGraph.getEndTime())
+                .isEqualTo(VertexFlameGraph.terminated().getEndTime());
+
+        // Check the running subtask
+        request = generateJobVertexFlameGraphParameters(1);
+        jobVertexFlameGraph = handler.handleRequest(request, archivedExecutionJobVertex);
+        assertThat(jobVertexFlameGraph.getEndTime())
+                .isEqualTo(taskThreadInfoStatsDefaultSample.getEndTime());
+
+        // Check the job vertex
+        request = generateJobVertexFlameGraphParameters(null);
+        jobVertexFlameGraph = handler.handleRequest(request, archivedExecutionJobVertex);
+        assertThat(jobVertexFlameGraph.getEndTime())
+                .isEqualTo(taskThreadInfoStatsDefaultSample.getEndTime());
+    }
+
+    /** All subtasks are finished. */
+    @Test
+    void testHandleFinishedJobVertex() throws Exception {
+        final ArchivedExecutionJobVertex archivedExecutionJobVertex =
+                new ArchivedExecutionJobVertex(
+                        new ArchivedExecutionVertex[] {
+                            generateExecutionVertex(0, ExecutionState.FINISHED),
+                            generateExecutionVertex(1, ExecutionState.FINISHED)
+                        },
+                        JOB_VERTEX_ID,
+                        "test",
+                        2,
+                        2,
+                        ResourceProfile.UNKNOWN,
+                        new StringifiedAccumulatorResult[0]);
+
+        HandlerRequest<EmptyRequestBody> request = generateJobVertexFlameGraphParameters(null);
+        VertexFlameGraph jobVertexFlameGraph =
+                handler.handleRequest(request, archivedExecutionJobVertex);
+        assertThat(jobVertexFlameGraph.getEndTime())
+                .isEqualTo(VertexFlameGraph.terminated().getEndTime());
+    }
+
+    private HandlerRequest<EmptyRequestBody> generateJobVertexFlameGraphParameters(
+            Integer subtaskIndex) throws HandlerRequestException {
+        final HashMap<String, String> receivedPathParameters = new HashMap<>(2);
+        receivedPathParameters.put(JobIDPathParameter.KEY, JOB_ID.toString());
+        receivedPathParameters.put(JobVertexIdPathParameter.KEY, JOB_VERTEX_ID.toString());
+
+        Map<String, List<String>> queryParams = new HashMap<>(2);
+        queryParams.put(
+                FlameGraphTypeQueryParameter.KEY,
+                Collections.singletonList(FlameGraphTypeQueryParameter.Type.FULL.name()));
+        if (subtaskIndex != null) {
+            queryParams.put(
+                    SubtaskIndexQueryParameter.KEY,
+                    Collections.singletonList(subtaskIndex.toString()));
+        }
+
+        return HandlerRequest.resolveParametersAndCreate(
+                EmptyRequestBody.getInstance(),
+                new JobVertexFlameGraphParameters(),
+                receivedPathParameters,
+                queryParams,
+                Collections.emptyList());
+    }
+
+    private ArchivedExecutionVertex generateExecutionVertex(
+            int subtaskIndex, ExecutionState executionState) {
+        return new ArchivedExecutionVertex(
+                subtaskIndex,
+                "test task",
+                new ArchivedExecution(
+                        new StringifiedAccumulatorResult[0],
+                        null,
+                        createExecutionAttemptId(JOB_VERTEX_ID, subtaskIndex, 0),
+                        executionState,
+                        null,
+                        null,
+                        null,
+                        new long[ExecutionState.values().length],
+                        new long[ExecutionState.values().length]),
+                new ExecutionHistory(0));
+    }
+
+    /**
+     * A {@link JobVertexStatsTracker} which returns the pre-generated thread info stats directly.
+     */
+    private static class TestThreadInfoTracker
+            implements JobVertexStatsTracker<VertexThreadInfoStats> {
+
+        private final VertexThreadInfoStats stats;
+
+        public TestThreadInfoTracker(VertexThreadInfoStats stats) {
+            this.stats = stats;
+        }
+
+        @Override
+        public Optional<VertexThreadInfoStats> getVertexStats(
+                JobID jobId, AccessExecutionJobVertex vertex) {
+            return Optional.of(stats);
+        }
+
+        @Override
+        public void shutDown() throws FlinkException {}
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
index d45aff28279..afd51257099 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
@@ -89,7 +89,7 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
                     .collect(Collectors.toSet());
     private static final JobID JOB_ID = new JobID();
 
-    private static JobVertexThreadInfoStats threadInfoStatsDefaultSample;
+    private static VertexThreadInfoStats threadInfoStatsDefaultSample;
 
     private static final Duration CLEAN_UP_INTERVAL = Duration.ofSeconds(60);
     private static final Duration STATS_REFRESH_INTERVAL = Duration.ofSeconds(60);
@@ -128,16 +128,16 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
     /** Tests that cached result is reused within refresh interval. */
     @Test
     public void testCachedStatsNotUpdatedWithinRefreshInterval() throws Exception {
-        final JobVertexThreadInfoStats unusedThreadInfoStats = createThreadInfoStats(1, TIME_GAP);
+        final VertexThreadInfoStats unusedThreadInfoStats = createThreadInfoStats(1, TIME_GAP);
 
-        final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
+        final JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker =
                 createThreadInfoTracker(
                         STATS_REFRESH_INTERVAL,
                         threadInfoStatsDefaultSample,
                         unusedThreadInfoStats);
         // stores threadInfoStatsDefaultSample in cache
         doInitialRequestAndVerifyResult(tracker);
-        Optional<JobVertexThreadInfoStats> result =
+        Optional<VertexThreadInfoStats> result =
                 tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
         // cached result is returned instead of unusedThreadInfoStats
         assertThat(result).isPresent().hasValue(threadInfoStatsDefaultSample);
@@ -149,19 +149,19 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
         final Duration shortRefreshInterval = Duration.ofMillis(1);
 
         // first entry is in the past, so refresh is triggered immediately upon fetching it
-        final JobVertexThreadInfoStats initialThreadInfoStats =
+        final VertexThreadInfoStats initialThreadInfoStats =
                 createThreadInfoStats(
                         Instant.now().minus(10, ChronoUnit.SECONDS),
                         REQUEST_ID,
                         Duration.ofMillis(5));
-        final JobVertexThreadInfoStats threadInfoStatsAfterRefresh =
+        final VertexThreadInfoStats threadInfoStatsAfterRefresh =
                 createThreadInfoStats(1, TIME_GAP);
 
         // register a CountDownLatch with the cache so we can await refresh of the entry
         CountDownLatch cacheRefreshed = new CountDownLatch(1);
-        Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> vertexStatsCache =
+        Cache<JobVertexThreadInfoTracker.Key, VertexThreadInfoStats> vertexStatsCache =
                 createCache(CLEAN_UP_INTERVAL, new LatchRemovalListener<>(cacheRefreshed));
-        final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
+        final JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker =
                 createThreadInfoTracker(
                         CLEAN_UP_INTERVAL,
                         shortRefreshInterval,
@@ -182,7 +182,7 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
         cacheRefreshed.await();
 
         // verify that we get the second result on the next request
-        Optional<JobVertexThreadInfoStats> result =
+        Optional<VertexThreadInfoStats> result =
                 tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
         assertExpectedEqualsReceived(threadInfoStatsAfterRefresh, result);
     }
@@ -194,9 +194,9 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
 
         // register a CountDownLatch with the cache so we can await expiry of the entry
         CountDownLatch cacheExpired = new CountDownLatch(1);
-        Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> vertexStatsCache =
+        Cache<JobVertexThreadInfoTracker.Key, VertexThreadInfoStats> vertexStatsCache =
                 createCache(shortCleanUpInterval, new LatchRemovalListener<>(cacheExpired));
-        final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
+        final JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker =
                 createThreadInfoTracker(
                         shortCleanUpInterval,
                         STATS_REFRESH_INTERVAL,
@@ -214,8 +214,7 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
     /** Tests that cached results are NOT removed within the cleanup interval. */
     @Test
     public void testCachedStatsNotCleanedWithinCleanupInterval() throws Exception {
-        final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
-                createThreadInfoTracker();
+        final JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker = createThreadInfoTracker();
 
         doInitialRequestAndVerifyResult(tracker);
 
@@ -228,8 +227,7 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
     /** Tests that cached results are not served after the shutdown. */
     @Test
     public void testShutDown() throws Exception {
-        final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
-                createThreadInfoTracker();
+        final JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker = createThreadInfoTracker();
         doInitialRequestAndVerifyResult(tracker);
 
         // shutdown directly
@@ -241,9 +239,9 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
         assertThat(tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX)).isNotPresent();
     }
 
-    private Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> createCache(
+    private Cache<JobVertexThreadInfoTracker.Key, VertexThreadInfoStats> createCache(
             Duration cleanUpInterval,
-            RemovalListener<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats>
+            RemovalListener<JobVertexThreadInfoTracker.Key, VertexThreadInfoStats>
                     removalListener) {
         return CacheBuilder.newBuilder()
                 .concurrencyLevel(1)
@@ -253,7 +251,7 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
     }
 
     private void doInitialRequestAndVerifyResult(
-            JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker)
+            JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker)
             throws InterruptedException, ExecutionException {
         // no stats yet, but the request triggers async collection of stats
         assertThat(tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX)).isNotPresent();
@@ -264,10 +262,9 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
     }
 
     private static void assertExpectedEqualsReceived(
-            JobVertexThreadInfoStats expected,
-            Optional<JobVertexThreadInfoStats> receivedOptional) {
+            VertexThreadInfoStats expected, Optional<VertexThreadInfoStats> receivedOptional) {
         assertThat(receivedOptional).isPresent();
-        JobVertexThreadInfoStats received = receivedOptional.get();
+        VertexThreadInfoStats received = receivedOptional.get();
 
         assertThat(expected.getRequestId()).isEqualTo(received.getRequestId());
         assertThat(expected.getEndTime()).isEqualTo(received.getEndTime());
@@ -279,20 +276,20 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
         }
     }
 
-    private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> createThreadInfoTracker() {
+    private JobVertexThreadInfoTracker<VertexThreadInfoStats> createThreadInfoTracker() {
         return createThreadInfoTracker(STATS_REFRESH_INTERVAL, threadInfoStatsDefaultSample);
     }
 
-    private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> createThreadInfoTracker(
-            Duration statsRefreshInterval, JobVertexThreadInfoStats... stats) {
+    private JobVertexThreadInfoTracker<VertexThreadInfoStats> createThreadInfoTracker(
+            Duration statsRefreshInterval, VertexThreadInfoStats... stats) {
         return createThreadInfoTracker(CLEAN_UP_INTERVAL, statsRefreshInterval, null, stats);
     }
 
-    private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> createThreadInfoTracker(
+    private JobVertexThreadInfoTracker<VertexThreadInfoStats> createThreadInfoTracker(
             Duration cleanUpInterval,
             Duration statsRefreshInterval,
-            Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> vertexStatsCache,
-            JobVertexThreadInfoStats... stats) {
+            Cache<JobVertexThreadInfoTracker.Key, VertexThreadInfoStats> vertexStatsCache,
+            VertexThreadInfoStats... stats) {
         final ThreadInfoRequestCoordinator coordinator =
                 new TestingThreadInfoRequestCoordinator(Runnable::run, REQUEST_TIMEOUT, stats);
 
@@ -311,11 +308,11 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
                 .build();
     }
 
-    private static JobVertexThreadInfoStats createThreadInfoStats(int requestId, Duration timeGap) {
+    private static VertexThreadInfoStats createThreadInfoStats(int requestId, Duration timeGap) {
         return createThreadInfoStats(Instant.now(), requestId, timeGap);
     }
 
-    private static JobVertexThreadInfoStats createThreadInfoStats(
+    private static VertexThreadInfoStats createThreadInfoStats(
             Instant startTime, int requestId, Duration timeGap) {
         Instant endTime = startTime.plus(timeGap);
 
@@ -331,7 +328,7 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
                     Collections.singletonList(threadInfoSample.get()));
         }
 
-        return new JobVertexThreadInfoStats(
+        return new VertexThreadInfoStats(
                 requestId, startTime.toEpochMilli(), endTime.toEpochMilli(), samples);
     }
 
@@ -372,19 +369,19 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
      */
     private static class TestingThreadInfoRequestCoordinator extends ThreadInfoRequestCoordinator {
 
-        private final JobVertexThreadInfoStats[] jobVertexThreadInfoStats;
+        private final VertexThreadInfoStats[] vertexThreadInfoStats;
         private int counter = 0;
 
         TestingThreadInfoRequestCoordinator(
                 Executor executor,
                 Duration requestTimeout,
-                JobVertexThreadInfoStats... jobVertexThreadInfoStats) {
+                VertexThreadInfoStats... vertexThreadInfoStats) {
             super(executor, requestTimeout);
-            this.jobVertexThreadInfoStats = jobVertexThreadInfoStats;
+            this.vertexThreadInfoStats = vertexThreadInfoStats;
         }
 
         @Override
-        public CompletableFuture<JobVertexThreadInfoStats> triggerThreadInfoRequest(
+        public CompletableFuture<VertexThreadInfoStats> triggerThreadInfoRequest(
                 Map<
                                 ImmutableSet<ExecutionAttemptID>,
                                 CompletableFuture<TaskExecutorThreadInfoGateway>>
@@ -396,7 +393,7 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
             assertThat(executionsWithGateways.keySet().iterator().next()).isEqualTo(ATTEMPT_IDS);
 
             return CompletableFuture.completedFuture(
-                    jobVertexThreadInfoStats[(counter++) % jobVertexThreadInfoStats.length]);
+                    vertexThreadInfoStats[(counter++) % vertexThreadInfoStats.length]);
         }
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinatorTest.java
index 147195ea859..f937ccd1c5d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinatorTest.java
@@ -104,14 +104,14 @@ public class ThreadInfoRequestCoordinatorTest extends TestLogger {
                         createMockSubtaskWithGateways(
                                 CompletionType.SUCCESSFULLY, CompletionType.SUCCESSFULLY);
 
-        CompletableFuture<JobVertexThreadInfoStats> requestFuture =
+        CompletableFuture<VertexThreadInfoStats> requestFuture =
                 coordinator.triggerThreadInfoRequest(
                         executionWithGateways,
                         DEFAULT_NUMBER_OF_SAMPLES,
                         DEFAULT_DELAY_BETWEEN_SAMPLES,
                         DEFAULT_MAX_STACK_TRACE_DEPTH);
 
-        JobVertexThreadInfoStats threadInfoStats = requestFuture.get();
+        VertexThreadInfoStats threadInfoStats = requestFuture.get();
 
         // verify the request result
         assertThat(threadInfoStats.getRequestId()).isEqualTo(0);
@@ -133,7 +133,7 @@ public class ThreadInfoRequestCoordinatorTest extends TestLogger {
                         createMockSubtaskWithGateways(
                                 CompletionType.SUCCESSFULLY, CompletionType.EXCEPTIONALLY);
 
-        CompletableFuture<JobVertexThreadInfoStats> requestFuture =
+        CompletableFuture<VertexThreadInfoStats> requestFuture =
                 coordinator.triggerThreadInfoRequest(
                         executionWithGateways,
                         DEFAULT_NUMBER_OF_SAMPLES,
@@ -156,7 +156,7 @@ public class ThreadInfoRequestCoordinatorTest extends TestLogger {
                         createMockSubtaskWithGateways(
                                 CompletionType.SUCCESSFULLY, CompletionType.TIMEOUT);
 
-        CompletableFuture<JobVertexThreadInfoStats> requestFuture =
+        CompletableFuture<VertexThreadInfoStats> requestFuture =
                 coordinator.triggerThreadInfoRequest(
                         executionWithGateways,
                         DEFAULT_NUMBER_OF_SAMPLES,
@@ -184,16 +184,16 @@ public class ThreadInfoRequestCoordinatorTest extends TestLogger {
                         createMockSubtaskWithGateways(
                                 CompletionType.SUCCESSFULLY, CompletionType.TIMEOUT);
 
-        List<CompletableFuture<JobVertexThreadInfoStats>> requestFutures = new ArrayList<>();
+        List<CompletableFuture<VertexThreadInfoStats>> requestFutures = new ArrayList<>();
 
-        CompletableFuture<JobVertexThreadInfoStats> requestFuture1 =
+        CompletableFuture<VertexThreadInfoStats> requestFuture1 =
                 coordinator.triggerThreadInfoRequest(
                         executionWithGateways,
                         DEFAULT_NUMBER_OF_SAMPLES,
                         DEFAULT_DELAY_BETWEEN_SAMPLES,
                         DEFAULT_MAX_STACK_TRACE_DEPTH);
 
-        CompletableFuture<JobVertexThreadInfoStats> requestFuture2 =
+        CompletableFuture<VertexThreadInfoStats> requestFuture2 =
                 coordinator.triggerThreadInfoRequest(
                         executionWithGateways,
                         DEFAULT_NUMBER_OF_SAMPLES,
@@ -204,7 +204,7 @@ public class ThreadInfoRequestCoordinatorTest extends TestLogger {
         requestFutures.add(requestFuture1);
         requestFutures.add(requestFuture2);
 
-        for (CompletableFuture<JobVertexThreadInfoStats> future : requestFutures) {
+        for (CompletableFuture<VertexThreadInfoStats> future : requestFutures) {
             assertThat(future).isNotDone();
         }
 
@@ -212,12 +212,12 @@ public class ThreadInfoRequestCoordinatorTest extends TestLogger {
         coordinator.shutDown();
 
         // verify all completed
-        for (CompletableFuture<JobVertexThreadInfoStats> future : requestFutures) {
+        for (CompletableFuture<VertexThreadInfoStats> future : requestFutures) {
             assertThat(future).isCompletedExceptionally();
         }
 
         // verify new trigger returns failed future
-        CompletableFuture<JobVertexThreadInfoStats> future =
+        CompletableFuture<VertexThreadInfoStats> future =
                 coordinator.triggerThreadInfoRequest(
                         executionWithGateways,
                         DEFAULT_NUMBER_OF_SAMPLES,