You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/02/04 08:02:26 UTC
[flink] 08/10: [FLINK-25432][runtime] Makes JobManagerMetricGroup implement LocallyCleanableResource and GloballyCleanableResource
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a6ef34392638bf724f020071de3d4f118ee753cf
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Dec 15 12:42:22 2021 +0100
[FLINK-25432][runtime] Makes JobManagerMetricGroup implement LocallyCleanableResource and GloballyCleanableResource
---
.../org/apache/flink/runtime/dispatcher/Dispatcher.java | 10 +++++++++-
.../runtime/metrics/groups/JobManagerMetricGroup.java | 14 +++++++++++---
.../flink/runtime/metrics/groups/JobManagerGroupTest.java | 5 +++--
3 files changed, 23 insertions(+), 6 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 92fe36b..6599115 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -902,7 +902,15 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
}
private void cleanUpRemainingJobData(JobID jobId, boolean jobGraphRemoved) {
- jobManagerMetricGroup.removeJob(jobId);
+ try {
+ jobManagerMetricGroup.globalCleanup(jobId);
+ } catch (Exception e) {
+ log.warn(
+ "Could not properly clean data for job {} stored in JobManager metric group",
+ jobId,
+ e);
+ }
+
if (jobGraphRemoved) {
try {
highAvailabilityServices.globalCleanup(jobId);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
index 431dab4..8d630c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
@@ -20,12 +20,16 @@ package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+import org.apache.flink.util.concurrent.FutureUtils;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Special {@link org.apache.flink.metrics.MetricGroup} representing a JobManager.
@@ -33,7 +37,8 @@ import java.util.Map;
* <p>Contains extra logic for adding jobs with tasks, and removing jobs when they do not contain
* tasks any more
*/
-public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetricGroup> {
+public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetricGroup>
+ implements LocallyCleanableResource {
private final Map<JobID, JobManagerJobMetricGroup> jobs = new HashMap<>();
@@ -84,9 +89,10 @@ public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetric
}
}
- public void removeJob(JobID jobId) {
+ @Override
+ public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor ignoredExecutor) {
if (jobId == null) {
- return;
+ return FutureUtils.completedVoidFuture();
}
synchronized (this) {
@@ -95,6 +101,8 @@ public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetric
containedGroup.close();
}
}
+
+ return FutureUtils.completedVoidFuture();
}
public int numRegisteredJobMetricGroups() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index 958c01a..4a06fa6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.metrics.MetricRegistryTestUtils;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
import org.junit.Test;
@@ -62,12 +63,12 @@ public class JobManagerGroupTest extends TestLogger {
assertEquals(2, group.numRegisteredJobMetricGroups());
- group.removeJob(jid1);
+ group.localCleanupAsync(jid1, Executors.directExecutor()).join();
assertTrue(jmJobGroup11.isClosed());
assertEquals(1, group.numRegisteredJobMetricGroups());
- group.removeJob(jid2);
+ group.localCleanupAsync(jid2, Executors.directExecutor()).join();
assertTrue(jmJobGroup21.isClosed());
assertEquals(0, group.numRegisteredJobMetricGroups());