You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/08 12:52:48 UTC

[GitHub] [flink] AHeise commented on a diff in pull request #19228: [FLINK-26074] Improve FlameGraphs scalability for high parallelism jobs

AHeise commented on code in PR #19228:
URL: https://github.com/apache/flink/pull/19228#discussion_r845810559


##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java:
##########
@@ -54,6 +61,27 @@ private ThreadInfoSample(Thread.State threadState, StackTraceElement[] stackTrac
         }
     }
 
+    /**
+     * Constructs a collection of {@link ThreadInfoSample}s from a {@link ThreadInfo} array.
+     *
+     * @param threadInfos {@link ThreadInfo} array where the data will be copied from.
+     * @return a Collection of the corresponding {@link ThreadInfoSample}s
+     */
+    public static Collection<ThreadInfoSample> from(ThreadInfo[] threadInfos) {
+        Collection<ThreadInfoSample> result = new ArrayList<>();
+        for (ThreadInfo threadInfo : threadInfos) {
+            if (threadInfo == null) {
+                LOG.warn("Missing thread info.");

Review Comment:
   I'd handle this case on call side when you can issue more information.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java:
##########
@@ -54,6 +61,27 @@ private ThreadInfoSample(Thread.State threadState, StackTraceElement[] stackTrac
         }
     }
 
+    /**
+     * Constructs a collection of {@link ThreadInfoSample}s from a {@link ThreadInfo} array.
+     *
+     * @param threadInfos {@link ThreadInfo} array where the data will be copied from.
+     * @return a Collection of the corresponding {@link ThreadInfoSample}s
+     */
+    public static Collection<ThreadInfoSample> from(ThreadInfo[] threadInfos) {

Review Comment:
   Question of taste: this could be changed to Java streams. (For simple transformations, I prefer it as it's better expressing the intent).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##########
@@ -545,23 +545,29 @@ private void stopTaskExecutorServices() throws Exception {
 
     @Override
     public CompletableFuture<TaskThreadInfoResponse> requestThreadInfoSamples(
-            final ExecutionAttemptID taskExecutionAttemptId,
+            final Set<ExecutionAttemptID> taskExecutionAttemptIds,
             final ThreadInfoSamplesRequest requestParams,
             final Time timeout) {
 
-        final Task task = taskSlotTable.getTask(taskExecutionAttemptId);
-        if (task == null) {
-            return FutureUtils.completedExceptionally(
-                    new IllegalStateException(
-                            String.format(
-                                    "Cannot sample task %s. "
-                                            + "Task is not known to the task manager.",
-                                    taskExecutionAttemptId)));
+        final Collection<Task> tasks = new ArrayList<>();
+        for (ExecutionAttemptID executionAttemptId : taskExecutionAttemptIds) {
+            final Task task = taskSlotTable.getTask(executionAttemptId);
+            if (task == null) {
+                log.warn(
+                        String.format(
+                                "Cannot sample task %s. "
+                                        + "Task is not known to the task manager.",
+                                executionAttemptId));
+            } else {
+                tasks.add(task);
+            }
         }
 
+        Set<SampleableTask> sampleableTasks =

Review Comment:
   Same about `Set`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleService.java:
##########
@@ -71,34 +74,44 @@
     }
 
     private void requestThreadInfoSamples(
-            final SampleableTask task,
+            final Collection<SampleableTask> tasks,
             final int numSamples,
             final Duration delayBetweenSamples,
             final int maxStackTraceDepth,
             final List<ThreadInfoSample> currentTraces,
             final CompletableFuture<List<ThreadInfoSample>> resultFuture) {
 
-        final long threadId = task.getExecutingThread().getId();
-        final Optional<ThreadInfoSample> threadInfoSample =
-                JvmUtils.createThreadInfoSample(threadId, maxStackTraceDepth);
+        final Collection<Long> threadIds =
+                tasks.stream()
+                        .map(t -> t.getExecutingThread().getId())
+                        .collect(Collectors.toList());
 
-        if (threadInfoSample.isPresent()) {
-            currentTraces.add(threadInfoSample.get());
+        final Collection<ThreadInfoSample> threadInfoSample =
+                JvmUtils.createThreadInfoSample(threadIds, maxStackTraceDepth);
+
+        if (!threadInfoSample.isEmpty()) {
+            currentTraces.addAll(threadInfoSample);
         } else if (!currentTraces.isEmpty()) {
+            // Requested tasks are not running anymore, completing with whatever was collected by
+            // now.
             resultFuture.complete(currentTraces);
         } else {
+            final String ids =
+                    tasks.stream()
+                            .map(SampleableTask::getExecutionId)
+                            .map(e -> e == null ? "unknown" : e.toString())

Review Comment:
   when is `e == null`? Can we filter them out earlier?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ThreadInfoSampleService.java:
##########
@@ -71,34 +74,44 @@
     }
 
     private void requestThreadInfoSamples(
-            final SampleableTask task,
+            final Collection<SampleableTask> tasks,
             final int numSamples,
             final Duration delayBetweenSamples,
             final int maxStackTraceDepth,
             final List<ThreadInfoSample> currentTraces,
             final CompletableFuture<List<ThreadInfoSample>> resultFuture) {
 
-        final long threadId = task.getExecutingThread().getId();
-        final Optional<ThreadInfoSample> threadInfoSample =
-                JvmUtils.createThreadInfoSample(threadId, maxStackTraceDepth);
+        final Collection<Long> threadIds =
+                tasks.stream()
+                        .map(t -> t.getExecutingThread().getId())
+                        .collect(Collectors.toList());
 
-        if (threadInfoSample.isPresent()) {
-            currentTraces.add(threadInfoSample.get());
+        final Collection<ThreadInfoSample> threadInfoSample =
+                JvmUtils.createThreadInfoSample(threadIds, maxStackTraceDepth);
+
+        if (!threadInfoSample.isEmpty()) {
+            currentTraces.addAll(threadInfoSample);
         } else if (!currentTraces.isEmpty()) {
+            // Requested tasks are not running anymore, completing with whatever was collected by
+            // now.
             resultFuture.complete(currentTraces);

Review Comment:
   Shouldn't this also shortcut with `return` so we don't reenqueue more samples?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##########
@@ -545,23 +545,29 @@ private void stopTaskExecutorServices() throws Exception {
 
     @Override
     public CompletableFuture<TaskThreadInfoResponse> requestThreadInfoSamples(
-            final ExecutionAttemptID taskExecutionAttemptId,
+            final Set<ExecutionAttemptID> taskExecutionAttemptIds,

Review Comment:
   `Set` always implies some overhead to deduplicate. The overhead is always worth it for user-facing things, but we could go with a simple `Collection` here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoStats.java:
##########
@@ -44,7 +45,7 @@
     private final long endTime;
 
     /** Map of thread info samples by execution ID. */
-    private final Map<ExecutionAttemptID, List<ThreadInfoSample>> samplesBySubtask;
+    private final Map<Set<ExecutionAttemptID>, List<ThreadInfoSample>> samplesBySubtask;

Review Comment:
   Does it make sense to retain the original key type? E.g. you'd flatten the set with `putAll`  into multiple entries. `Collection` as a key is prone to surprises.
   You can then probably retain the original `matchExecutionsWithGateways` implementation (not sure though).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org