You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/02/04 08:02:26 UTC

[flink] 08/10: [FLINK-25432][runtime] Makes JobManagerMetricGroup implement LocallyCleanableResource and GloballyCleanableResource

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a6ef34392638bf724f020071de3d4f118ee753cf
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Dec 15 12:42:22 2021 +0100

    [FLINK-25432][runtime] Makes JobManagerMetricGroup implement LocallyCleanableResource and GloballyCleanableResource
---
 .../org/apache/flink/runtime/dispatcher/Dispatcher.java    | 10 +++++++++-
 .../runtime/metrics/groups/JobManagerMetricGroup.java      | 14 +++++++++++---
 .../flink/runtime/metrics/groups/JobManagerGroupTest.java  |  5 +++--
 3 files changed, 23 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 92fe36b..6599115 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -902,7 +902,15 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
     }
 
     private void cleanUpRemainingJobData(JobID jobId, boolean jobGraphRemoved) {
-        jobManagerMetricGroup.removeJob(jobId);
+        try {
+            jobManagerMetricGroup.globalCleanup(jobId);
+        } catch (Exception e) {
+            log.warn(
+                    "Could not properly clean data for job {} stored in JobManager metric group",
+                    jobId,
+                    e);
+        }
+
         if (jobGraphRemoved) {
             try {
                 highAvailabilityServices.globalCleanup(jobId);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
index 431dab4..8d630c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
@@ -20,12 +20,16 @@ package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Special {@link org.apache.flink.metrics.MetricGroup} representing a JobManager.
@@ -33,7 +37,8 @@ import java.util.Map;
  * <p>Contains extra logic for adding jobs with tasks, and removing jobs when they do not contain
  * tasks any more
  */
-public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetricGroup> {
+public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetricGroup>
+        implements LocallyCleanableResource {
 
     private final Map<JobID, JobManagerJobMetricGroup> jobs = new HashMap<>();
 
@@ -84,9 +89,10 @@ public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetric
         }
     }
 
-    public void removeJob(JobID jobId) {
+    @Override
+    public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor ignoredExecutor) {
         if (jobId == null) {
-            return;
+            return FutureUtils.completedVoidFuture();
         }
 
         synchronized (this) {
@@ -95,6 +101,8 @@ public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetric
                 containedGroup.close();
             }
         }
+
+        return FutureUtils.completedVoidFuture();
     }
 
     public int numRegisteredJobMetricGroups() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index 958c01a..4a06fa6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.metrics.MetricRegistryTestUtils;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
 
 import org.junit.Test;
 
@@ -62,12 +63,12 @@ public class JobManagerGroupTest extends TestLogger {
 
         assertEquals(2, group.numRegisteredJobMetricGroups());
 
-        group.removeJob(jid1);
+        group.localCleanupAsync(jid1, Executors.directExecutor()).join();
 
         assertTrue(jmJobGroup11.isClosed());
         assertEquals(1, group.numRegisteredJobMetricGroups());
 
-        group.removeJob(jid2);
+        group.localCleanupAsync(jid2, Executors.directExecutor()).join();
 
         assertTrue(jmJobGroup21.isClosed());
         assertEquals(0, group.numRegisteredJobMetricGroups());