You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/02 09:28:02 UTC

[GitHub] [flink] AHeise commented on a diff in pull request #19228: [FLINK-26074] Improve FlameGraphs scalability for high parallelism jobs

AHeise commented on code in PR #19228:
URL: https://github.com/apache/flink/pull/19228#discussion_r862691155


##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java:
##########
@@ -67,11 +67,15 @@ public static Optional<ThreadInfoSample> from(@Nullable ThreadInfo threadInfo) {
      * @param threadInfos {@link ThreadInfo} array where the data will be copied from.
      * @return a Collection of the corresponding {@link ThreadInfoSample}s
      */
-    public static Collection<ThreadInfoSample> from(ThreadInfo[] threadInfos) {

Review Comment:
   ```
       public StackTraceElement[] getStackTrace() {
           return stackTrace.clone();
       }
   ```
   Why do we clone here?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinatorTest.java:
##########
@@ -227,10 +229,17 @@ private static CompletableFuture<TaskExecutorThreadInfoGateway> createMockTaskMa
         final CompletableFuture<TaskThreadInfoResponse> responseFuture = new CompletableFuture<>();
         switch (completionType) {
             case SUCCESSFULLY:
-                ThreadInfoSample sample =
-                        JvmUtils.createThreadInfoSample(Thread.currentThread().getId(), 100).get();
+                List<IdleTestTask> tasks = new ArrayList<>();
+                tasks.add(new IdleTestTask());
+                tasks.add(new IdleTestTask());
+                List<Long> threadIds =
+                        tasks.stream()
+                                .map(t -> t.getExecutingThread().getId())
+                                .collect(Collectors.toList());
+                Collection<ThreadInfoSample> threadInfoSample =
+                        JvmUtils.createThreadInfoSample(threadIds, 100);
                 responseFuture.complete(
-                        new TaskThreadInfoResponse(Collections.singletonList(sample)));
+                        new TaskThreadInfoResponse(new ArrayList<>(threadInfoSample)));

Review Comment:
   Can you align the the types so that a conversion is not necessary? (either always collection or always list?)
   A conversion from collection to list smells (either you need the order or you don't but once the order is lost there is no way back).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java:
##########
@@ -204,31 +206,33 @@ private void triggerThreadInfoSampleInternal(
         }
     }
 
-    private Map<Set<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>>
+    private Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>>

Review Comment:
   I think it makes it much clearer to use `ImmutableSet` for keys. So I'd keep the change.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinatorTest.java:
##########
@@ -265,15 +267,20 @@ private static CompletableFuture<TaskExecutorThreadInfoGateway> createMockTaskMa
         return CompletableFuture.completedFuture(executorGateway);
     }
 
-    private static Map<Set<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>>
+    private static Map<
+                    ImmutableSet<ExecutionAttemptID>,
+                    CompletableFuture<TaskExecutorThreadInfoGateway>>
             createMockSubtaskWithGateways(CompletionType... completionTypes) {
-        final Map<Set<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>>
+        final Map<
+                        ImmutableSet<ExecutionAttemptID>,
+                        CompletableFuture<TaskExecutorThreadInfoGateway>>
                 result = new HashMap<>();
         for (CompletionType completionType : completionTypes) {
             Set<ExecutionAttemptID> attemptIds = new HashSet<>();

Review Comment:
   You should be able to avoid `attemptIds` entirely. Can't you use `ImmutableSet.of(new ExecutionAttemptID(), new ExecutionAttemptID())`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java:
##########
@@ -204,31 +206,33 @@ private void triggerThreadInfoSampleInternal(
         }
     }
 
-    private Map<Set<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>>
+    private Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>>
             matchExecutionsWithGateways(
                     AccessExecutionVertex[] executionVertices,
                     ResourceManagerGateway resourceManagerGateway) {
 
         // Group executions by their TaskManagerLocation to be able to issue one sampling
         // request per TaskManager for all relevant tasks at once
-        final Map<TaskManagerLocation, Set<ExecutionAttemptID>> executionsByLocation =
+        final Map<TaskManagerLocation, ImmutableSet<ExecutionAttemptID>> executionsByLocation =

Review Comment:
   This is not needed strictly speaking but it may ease things, so I'm fine either way. Just wanted to make it explicit. Certainly better than using `? extends Set` here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java:
##########
@@ -67,11 +67,15 @@ public static Optional<ThreadInfoSample> from(@Nullable ThreadInfo threadInfo) {
      * @param threadInfos {@link ThreadInfo} array where the data will be copied from.
      * @return a Collection of the corresponding {@link ThreadInfoSample}s
      */
-    public static Collection<ThreadInfoSample> from(ThreadInfo[] threadInfos) {
+    public static Collection<ThreadInfoSample> from(
+            ThreadInfo[] threadInfos, long[] requestedThreadIds) {
         Collection<ThreadInfoSample> result = new ArrayList<>();
-        for (ThreadInfo threadInfo : threadInfos) {
+        for (int i = 0; i < threadInfos.length; i++) {
+            ThreadInfo threadInfo = threadInfos[i];
             if (threadInfo == null) {
-                LOG.warn("Missing thread info.");
+                LOG.warn(
+                        "FlameGraphs: thread {} is not alive or does not exist.",
+                        requestedThreadIds[i]);
             } else {

Review Comment:
   I'd still prefer it on call site and have the invariant that all `ThreadInfo` is non-null. Arrays with nulls are a nightmare and I don't see that this is necessary here (it's not a concurrent section where nulls cannot be avoided sometimes).
   
   Btw do we really want to log warnings? I'd imagine that this is a common case for bounded applications and it would spam the log for larger setups.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/IdleTestTask.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/** The test task that creates an idle (sleeping) thread. */
+public class IdleTestTask implements SampleableTask {
+
+    private final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
+    private final Thread thread;
+
+    /** Instantiates a new idle test task with default sleep duration (10s). */
+    public IdleTestTask() {
+        this(10000L);
+    }

Review Comment:
   Inline so the tests become easier to read?
   Also should you wait indefinitely in case of azure hickups?
   What happens with spurious wakeups? Do tests fail then?
   
   A clean solution would be to have a `run` like this:
   ```java
                         while (!stopped) {
                               try {
                                   Thread.sleep(100);
                               } catch (InterruptedException e) {
                               }
                        }
   ```
   
   Then your test code is responsible for setting the `volatile boolean stopped` and shutting the thread down (ideally in a `finally` clause).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org