You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/06/20 09:55:04 UTC

[flink] branch release-1.14 updated: [FLINK-27420] Recreate metric groups for each new RM to avoid metric loss

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new ff060543bc8 [FLINK-27420] Recreate metric groups for each new RM to avoid metric loss
ff060543bc8 is described below

commit ff060543bc8867bdc97e4c3138c4999894f2909e
Author: Ben Augarten <ba...@gmail.com>
AuthorDate: Wed May 4 18:12:27 2022 -0700

    [FLINK-27420] Recreate metric groups for each new RM to avoid metric loss
    
    This closes #20019
---
 .../resourcemanager/ResourceManagerFactory.java    | 15 ++---
 .../ResourceManagerProcessContext.java             | 23 ++++----
 .../ResourceManagerServiceImplTest.java            | 66 ++++++++++++++++++++++
 3 files changed, 83 insertions(+), 21 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
index fab804b7b55..b8901fe03f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
@@ -61,11 +61,6 @@ public abstract class ResourceManagerFactory<T extends ResourceIDRetrievable> {
             Executor ioExecutor)
             throws ConfigurationException {
 
-        final ResourceManagerMetricGroup resourceManagerMetricGroup =
-                ResourceManagerMetricGroup.create(metricRegistry, hostname);
-        final SlotManagerMetricGroup slotManagerMetricGroup =
-                SlotManagerMetricGroup.create(metricRegistry, hostname);
-
         final Configuration runtimeServicesAndRmConfig =
                 getEffectiveConfigurationForResourceManagerAndRuntimeServices(configuration);
 
@@ -84,8 +79,8 @@ public abstract class ResourceManagerFactory<T extends ResourceIDRetrievable> {
                 fatalErrorHandler,
                 clusterInformation,
                 webInterfaceUrl,
-                resourceManagerMetricGroup,
-                slotManagerMetricGroup,
+                metricRegistry,
+                hostname,
                 ioExecutor);
     }
 
@@ -98,7 +93,8 @@ public abstract class ResourceManagerFactory<T extends ResourceIDRetrievable> {
                         context.getRmRuntimeServicesConfig(),
                         context.getRpcService(),
                         context.getHighAvailabilityServices(),
-                        context.getSlotManagerMetricGroup());
+                        SlotManagerMetricGroup.create(
+                                context.getMetricRegistry(), context.getHostname()));
 
         return createResourceManager(
                 context.getRmConfig(),
@@ -109,7 +105,8 @@ public abstract class ResourceManagerFactory<T extends ResourceIDRetrievable> {
                 context.getFatalErrorHandler(),
                 context.getClusterInformation(),
                 context.getWebInterfaceUrl(),
-                context.getResourceManagerMetricGroup(),
+                ResourceManagerMetricGroup.create(
+                        context.getMetricRegistry(), context.getHostname()),
                 resourceManagerRuntimeServices,
                 context.getIoExecutor());
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerProcessContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerProcessContext.java
index 79d8ed13d90..ed72b75be2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerProcessContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerProcessContext.java
@@ -22,8 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
-import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
@@ -48,8 +47,8 @@ public class ResourceManagerProcessContext {
     private final FatalErrorHandler fatalErrorHandler;
     private final ClusterInformation clusterInformation;
     @Nullable private final String webInterfaceUrl;
-    private final ResourceManagerMetricGroup resourceManagerMetricGroup;
-    private final SlotManagerMetricGroup slotManagerMetricGroup;
+    private final MetricRegistry metricRegistry;
+    private final String hostname;
     private final Executor ioExecutor;
 
     public ResourceManagerProcessContext(
@@ -61,8 +60,8 @@ public class ResourceManagerProcessContext {
             FatalErrorHandler fatalErrorHandler,
             ClusterInformation clusterInformation,
             @Nullable String webInterfaceUrl,
-            ResourceManagerMetricGroup resourceManagerMetricGroup,
-            SlotManagerMetricGroup slotManagerMetricGroup,
+            MetricRegistry metricRegistry,
+            String hostname,
             Executor ioExecutor) {
         this.rmConfig = checkNotNull(rmConfig);
         this.rmRuntimeServicesConfig = checkNotNull(rmRuntimeServicesConfig);
@@ -71,8 +70,8 @@ public class ResourceManagerProcessContext {
         this.heartbeatServices = checkNotNull(heartbeatServices);
         this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
         this.clusterInformation = checkNotNull(clusterInformation);
-        this.resourceManagerMetricGroup = checkNotNull(resourceManagerMetricGroup);
-        this.slotManagerMetricGroup = checkNotNull(slotManagerMetricGroup);
+        this.metricRegistry = checkNotNull(metricRegistry);
+        this.hostname = checkNotNull(hostname);
         this.ioExecutor = checkNotNull(ioExecutor);
 
         this.webInterfaceUrl = webInterfaceUrl;
@@ -111,12 +110,12 @@ public class ResourceManagerProcessContext {
         return webInterfaceUrl;
     }
 
-    public ResourceManagerMetricGroup getResourceManagerMetricGroup() {
-        return resourceManagerMetricGroup;
+    public MetricRegistry getMetricRegistry() {
+        return metricRegistry;
     }
 
-    public SlotManagerMetricGroup getSlotManagerMetricGroup() {
-        return slotManagerMetricGroup;
+    public String getHostname() {
+        return hostname;
     }
 
     public Executor getIoExecutor() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
index ec793d501b8..80c2fa6af8b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 import org.apache.flink.runtime.rpc.RpcUtils;
@@ -35,12 +36,17 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeoutException;
 
@@ -490,6 +496,59 @@ public class ResourceManagerServiceImplTest extends TestLogger {
         deregisterApplicationFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
     }
 
+    @Test
+    public void grantAndRevokeLeadership_verifyMetrics() throws Exception {
+        final Set<String> registeredMetrics = Collections.newSetFromMap(new ConcurrentHashMap<>());
+        TestingMetricRegistry metricRegistry =
+                TestingMetricRegistry.builder()
+                        .setRegisterConsumer((a, b, c) -> registeredMetrics.add(b))
+                        .setUnregisterConsumer((a, b, c) -> registeredMetrics.remove(b))
+                        .build();
+
+        final TestingResourceManagerFactory rmFactory = rmFactoryBuilder.build();
+        resourceManagerService =
+                ResourceManagerServiceImpl.create(
+                        rmFactory,
+                        new Configuration(),
+                        rpcService,
+                        haService,
+                        heartbeatServices,
+                        fatalErrorHandler,
+                        clusterInformation,
+                        null,
+                        metricRegistry,
+                        "localhost",
+                        ForkJoinPool.commonPool());
+        resourceManagerService.start();
+
+        Assert.assertEquals(0, registeredMetrics.size());
+        // grant leadership
+        leaderElectionService.isLeader(UUID.randomUUID());
+
+        assertRmStarted();
+        Set<String> expectedMetrics = new HashSet<>();
+        expectedMetrics.add(MetricNames.NUM_REGISTERED_TASK_MANAGERS);
+        expectedMetrics.add(MetricNames.TASK_SLOTS_TOTAL);
+        expectedMetrics.add(MetricNames.TASK_SLOTS_AVAILABLE);
+        Assert.assertTrue(
+                "Expected RM to register leader metrics",
+                registeredMetrics.containsAll(expectedMetrics));
+
+        // revoke leadership, block until old rm is terminated
+        revokeLeadership();
+
+        Set<String> intersection = new HashSet<>(registeredMetrics);
+        intersection.retainAll(expectedMetrics);
+        Assert.assertTrue("Expected RM to unregister leader metrics", intersection.isEmpty());
+
+        leaderElectionService.isLeader(UUID.randomUUID());
+
+        assertRmStarted();
+        Assert.assertTrue(
+                "Expected RM to re-register leader metrics",
+                registeredMetrics.containsAll(expectedMetrics));
+    }
+
     private static void blockOnFuture(CompletableFuture<?> future) {
         try {
             future.get();
@@ -511,4 +570,11 @@ public class ResourceManagerServiceImplTest extends TestLogger {
     private void assertRmStarted() throws Exception {
         leaderElectionService.getConfirmationFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit());
     }
+
+    private void revokeLeadership() {
+        ResourceManager<?> leaderResourceManager =
+                resourceManagerService.getLeaderResourceManager();
+        leaderElectionService.notLeader();
+        blockOnFuture(leaderResourceManager.getTerminationFuture());
+    }
 }