You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/12/06 08:05:14 UTC

[flink] branch release-1.16 updated (da9e997752c -> 0f98c6aad1f)

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


    from da9e997752c [FLINK-30291][Connector/DynamoDB] Update docs to render DynamoDB connector docs
     new eef91f8be7b [hotfix][rest] Minor clean-ups in JobVertexThreadInfoTracker.
     new 0f98c6aad1f [FLINK-30239][rest] Fix the bug that FlameGraph cannot be generated due to using ImmutableSet incorrectly

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../threadinfo/JobVertexThreadInfoTracker.java     |  8 +++---
 .../threadinfo/JobVertexThreadInfoTrackerTest.java | 31 ++++++++++++++++++++--
 2 files changed, 34 insertions(+), 5 deletions(-)


[flink] 01/02: [hotfix][rest] Minor clean-ups in JobVertexThreadInfoTracker.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eef91f8be7beaabae94596bfd36807a4c955f266
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Wed Nov 30 00:26:00 2022 +0800

    [hotfix][rest] Minor clean-ups in JobVertexThreadInfoTracker.
---
 .../runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java   | 6 ++++--
 .../webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java       | 2 +-
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
index 45b469632b1..57072263b73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
@@ -274,7 +274,9 @@ public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVert
         }
 
         return executionAttemptsByLocation.entrySet().stream()
-                .collect(Collectors.toMap(e -> e.getKey(), e -> ImmutableSet.copyOf(e.getValue())));
+                .collect(
+                        Collectors.toMap(
+                                Map.Entry::getKey, e -> ImmutableSet.copyOf(e.getValue())));
     }
 
     @VisibleForTesting
@@ -353,7 +355,7 @@ public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVert
                         vertexStatsCache.put(key, createStatsFn.apply(threadInfoStats));
                         resultAvailableFuture.complete(null);
                     } else {
-                        LOG.debug(
+                        LOG.error(
                                 "Failed to gather a thread info sample for {}",
                                 vertex.getName(),
                                 throwable);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
index 845b5a798f1..ec6855cafa1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
@@ -135,7 +135,7 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
         Optional<JobVertexThreadInfoStats> result =
                 tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
         // cached result is returned instead of unusedThreadInfoStats
-        assertThat(threadInfoStatsDefaultSample).isEqualTo(result.get());
+        assertThat(result).isPresent().hasValue(threadInfoStatsDefaultSample);
     }
 
     /** Tests that cached result is NOT reused after refresh interval. */


[flink] 02/02: [FLINK-30239][rest] Fix the bug that FlameGraph cannot be generated due to using ImmutableSet incorrectly

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0f98c6aad1f5bc8ad25f7608c419a5e396b8e8ac
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Mon Dec 5 20:58:54 2022 +0800

    [FLINK-30239][rest] Fix the bug that FlameGraph cannot be generated due to using ImmutableSet incorrectly
    
    This closes #21420
---
 .../threadinfo/JobVertexThreadInfoTracker.java     |  2 +-
 .../threadinfo/JobVertexThreadInfoTrackerTest.java | 29 +++++++++++++++++++++-
 2 files changed, 29 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
index 57072263b73..6933655c0b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
@@ -269,7 +269,7 @@ public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVert
 
                 ExecutionAttemptID attemptId = execution.getAttemptId();
                 groupedAttemptIds.add(attemptId);
-                executionAttemptsByLocation.put(tmLocation, ImmutableSet.copyOf(groupedAttemptIds));
+                executionAttemptsByLocation.put(tmLocation, groupedAttemptIds);
             }
         }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
index ec6855cafa1..f7d4ebfd288 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
@@ -20,16 +20,21 @@ package org.apache.flink.runtime.webmonitor.threadinfo;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.messages.ThreadInfoSample;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.runtime.util.JvmUtils;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.util.TestLogger;
@@ -49,6 +54,7 @@ import javax.annotation.Nonnull;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -65,7 +71,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
+import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createScheduler;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
 /** Tests for the {@link JobVertexThreadInfoTracker}. */
@@ -74,6 +82,12 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
     private static final int REQUEST_ID = 0;
     private static final ExecutionJobVertex EXECUTION_JOB_VERTEX = createExecutionJobVertex();
     private static final ExecutionVertex[] TASK_VERTICES = EXECUTION_JOB_VERTEX.getTaskVertices();
+    private static final Set<ExecutionAttemptID> ATTEMPT_IDS =
+            Arrays.stream(TASK_VERTICES)
+                    .map(
+                            executionVertex ->
+                                    executionVertex.getCurrentExecutionAttempt().getAttemptId())
+                    .collect(Collectors.toSet());
     private static final JobID JOB_ID = new JobID();
 
     private static ThreadInfoSample threadInfoSample;
@@ -338,8 +352,18 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
     private static ExecutionJobVertex createExecutionJobVertex() {
         try {
             JobVertex jobVertex = new JobVertex("testVertex");
+            jobVertex.setParallelism(10);
             jobVertex.setInvokableClass(AbstractInvokable.class);
-            return ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex);
+
+            final SchedulerBase scheduler =
+                    createScheduler(
+                            JobGraphTestUtils.streamingJobGraph(jobVertex),
+                            ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                            new DirectScheduledExecutorService());
+            final ExecutionGraph eg = scheduler.getExecutionGraph();
+            scheduler.startScheduling();
+            ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+            return scheduler.getExecutionJobVertex(jobVertex.getID());
         } catch (Exception e) {
             throw new RuntimeException("Failed to create ExecutionJobVertex.");
         }
@@ -382,6 +406,9 @@ public class JobVertexThreadInfoTrackerTest extends TestLogger {
                 int ignored2,
                 Duration ignored3,
                 int ignored4) {
+            assertThat(executionsWithGateways.size() == 1).isTrue();
+            assertThat(executionsWithGateways.keySet().iterator().next()).isEqualTo(ATTEMPT_IDS);
+
             return CompletableFuture.completedFuture(
                     jobVertexThreadInfoStats[(counter++) % jobVertexThreadInfoStats.length]);
         }