You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/15 10:31:12 UTC

[GitHub] [flink] xintongsong commented on a diff in pull request #21447: [FLINK-30185] Provide the flame graph to the subtask level

xintongsong commented on code in PR #21447:
URL: https://github.com/apache/flink/pull/21447#discussion_r1049319867


##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java:
##########
@@ -151,60 +146,88 @@
     }
 
     @Override
-    public Optional<T> getVertexStats(JobID jobId, AccessExecutionJobVertex vertex) {
+    public Optional<ThreadInfoStats> getVertexStats(JobID jobId, AccessExecutionJobVertex vertex) {
         synchronized (lock) {
-            final Key key = getKey(jobId, vertex);
+            List<AccessExecutionVertex> needRefreshedExecutionVertices = new ArrayList<>();
+            List<ThreadInfoSample> results = new ArrayList<>();
+
+            int requestId = Integer.MAX_VALUE;
+            long startTime = Long.MAX_VALUE;
+            long endTime = Long.MIN_VALUE;
+            for (AccessExecutionVertex executionVertex : vertex.getTaskVertices()) {
+                Key key = getKey(jobId, executionVertex);
+                final ThreadInfoStats stats = executionVertexStatsCache.getIfPresent(key);
+                if (stats != null) {
+                    results.addAll(stats.getSamples());
+                    requestId = Math.min(requestId, stats.getRequestId());
+                    startTime = Math.min(startTime, stats.getStartTime());
+                    endTime = Math.max(endTime, stats.getEndTime());

Review Comment:
   Not sure about these aggregations.
   - For `requestId`, there's no longer 1-1 mappings between the pending requests and the thread info stats.
   - For `start/endTime`, it does not sounds right sampling a group of tasks with different start / end time.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmUtils.java:
##########
@@ -53,29 +56,32 @@ public static Collection<ThreadInfo> createThreadDump() {
      * Creates a {@link ThreadInfoSample} for a specific thread. Contains thread traces if
      * maxStackTraceDepth > 0.
      *
+     * @param executionId The execution id of this threadInfo.
      * @param threadId The ID of the thread to create the thread dump for.
      * @param maxStackTraceDepth The maximum number of entries in the stack trace to be collected.
      * @return The thread information of a specific thread.
      */
     public static Optional<ThreadInfoSample> createThreadInfoSample(
-            long threadId, int maxStackTraceDepth) {
+            ExecutionAttemptID executionId, long threadId, int maxStackTraceDepth) {

Review Comment:
   A `JvmUtils` should not be aware of the Flink DAG concepts such as `ExecutionAttemptID` and `SampleableTask`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java:
##########
@@ -32,25 +36,33 @@
  */
 public class ThreadInfoSample implements Serializable {
 
+    private final ExecutionAttemptID executionId;

Review Comment:
   Not sure about including the `executionId` in `ThreadInfoSample`. I think it changes `ThreadInfoSample` from a generalized class that wraps parts of `java.lang.management.ThreadInfo` to a specialized class that only serves for a flink execution. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org